You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/02 18:53:41 UTC

[pulsar] branch branch-2.9 updated: [Broker] Fix LeaderElectionService.getCurrentLeader and add support for empheralOwner in MockZooKeeper (#13066)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 3d0ce3a  [Broker] Fix LeaderElectionService.getCurrentLeader and add support for empheralOwner in MockZooKeeper (#13066)
3d0ce3a is described below

commit 3d0ce3ac19abb4810a3fa410d292a49a9420d21d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Dec 2 20:34:35 2021 +0200

    [Broker] Fix LeaderElectionService.getCurrentLeader and add support for empheralOwner in MockZooKeeper (#13066)
    
    * Add leader election unit test that uses multiple brokers
    
    * Support empheralOwner in MockZooKeeper
    
    * Use unique sessions for all brokers
    
    * Add failing test case for leader broker information not available on other brokers
    
    * Add test for reading the current leader
    
    * Fix issue in leader election
    
    * Address review feedback: make methods static
    
    * Use unique-session only in MultiBrokerBaseTests
    
    * Move tenant and namespace creation to it's own method
    
    * Improve cleanup
    
    * Add alwaysRun to BeforeClass
    
    * Fix leaks in locking in MockZooKeeper
    
    * Reduce code duplication
    
    * Fix NPE when CreateMode is null
    
    (cherry picked from commit 36a45ee89639bf66a12de23e415d80451deaa4e0)
---
 .../apache/pulsar/broker/MultiBrokerBaseTest.java  | 153 ++++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  12 +-
 .../loadbalance/MultiBrokerLeaderElectionTest.java |  71 ++
 .../coordination/impl/LeaderElectionImpl.java      |   5 +-
 .../java/org/apache/zookeeper/MockZooKeeper.java   | 860 ++++++++++++---------
 .../org/apache/zookeeper/MockZooKeeperSession.java |  16 +-
 6 files changed, 728 insertions(+), 389 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
new file mode 100644
index 0000000..f4d106d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.MockZooKeeperSession;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
+    protected List<PulsarService> additionalBrokers;
+    protected List<PulsarAdmin> additionalBrokerAdmins;
+    protected List<PulsarClient> additionalBrokerClients;
+
+    protected int numberOfAdditionalBrokers() {
+        return 2;
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    public final void setup() throws Exception {
+        super.internalSetup();
+        additionalBrokersSetup();
+        pulsarResourcesSetup();
+    }
+
+    protected void pulsarResourcesSetup() throws PulsarAdminException {
+        admin.tenants().createTenant("public", createDefaultTenantInfo());
+        admin.namespaces()
+                .createNamespace("public/default", getPulsar().getConfiguration().getDefaultNumberOfNamespaceBundles());
+    }
+
+    protected void additionalBrokersSetup() throws Exception {
+        int numberOfAdditionalBrokers = numberOfAdditionalBrokers();
+        additionalBrokers = new ArrayList<>(numberOfAdditionalBrokers);
+        additionalBrokerAdmins = new ArrayList<>(numberOfAdditionalBrokers);
+        additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers);
+        for (int i = 0; i < numberOfAdditionalBrokers; i++) {
+            PulsarService pulsarService = createAdditionalBroker(i);
+            additionalBrokers.add(i, pulsarService);
+            PulsarAdminBuilder pulsarAdminBuilder =
+                    PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != null
+                            ? pulsarService.getWebServiceAddress()
+                            : pulsarService.getWebServiceAddressTls());
+            customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
+            additionalBrokerAdmins.add(i, pulsarAdminBuilder.build());
+            additionalBrokerClients.add(i, newPulsarClient(pulsarService.getBrokerServiceUrl(), 0));
+        }
+    }
+
+    protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
+        return getDefaultConf();
+    }
+
+    protected PulsarService createAdditionalBroker(int additionalBrokerIndex) throws Exception {
+        return startBroker(createConfForAdditionalBroker(additionalBrokerIndex));
+    }
+
+    @Override
+    protected ZKMetadataStore createLocalMetadataStore() {
+        // use MockZooKeeperSession to provide a unique session id for each instance
+        return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
+    }
+
+    @Override
+    protected ZKMetadataStore createConfigurationMetadataStore() {
+        // use MockZooKeeperSession to provide a unique session id for each instance
+        return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    public final void cleanup() throws Exception {
+        additionalBrokersCleanup();
+        super.internalCleanup();
+    }
+
+    protected void additionalBrokersCleanup() {
+        if (additionalBrokerAdmins != null) {
+            for (PulsarAdmin additionalBrokerAdmin : additionalBrokerAdmins) {
+                additionalBrokerAdmin.close();
+            }
+            additionalBrokerAdmins = null;
+        }
+        if (additionalBrokerClients != null) {
+            for (PulsarClient additionalBrokerClient : additionalBrokerClients) {
+                try {
+                    additionalBrokerClient.shutdown();
+                } catch (PulsarClientException e) {
+                    // ignore
+                }
+            }
+            additionalBrokerClients = null;
+        }
+        if (additionalBrokers != null) {
+            for (PulsarService pulsarService : additionalBrokers) {
+                try {
+                    pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
+                    pulsarService.close();
+                } catch (PulsarServerException e) {
+                    // ignore
+                }
+            }
+            additionalBrokers = null;
+        }
+    }
+
+    public final List<PulsarService> getAllBrokers() {
+        List<PulsarService> brokers = new ArrayList<>(numberOfAdditionalBrokers() + 1);
+        brokers.add(getPulsar());
+        brokers.addAll(additionalBrokers);
+        return Collections.unmodifiableList(brokers);
+    }
+
+    public final List<PulsarAdmin> getAllAdmins() {
+        List<PulsarAdmin> admins = new ArrayList<>(numberOfAdditionalBrokers() + 1);
+        admins.add(admin);
+        admins.addAll(additionalBrokerAdmins);
+        return Collections.unmodifiableList(admins);
+    }
+
+    public final List<PulsarClient> getAllClients() {
+        List<PulsarClient> clients = new ArrayList<>(numberOfAdditionalBrokers() + 1);
+        clients.add(pulsarClient);
+        clients.addAll(additionalBrokerClients);
+        return Collections.unmodifiableList(clients);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 49e7ef3..a3f7166 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -317,8 +317,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         // Override default providers with mocked ones
         doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
         doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(pulsar).createConfigurationMetadataStore();
+        doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
+        doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();
 
         Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
         doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
@@ -332,6 +332,14 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         }
     }
 
+    protected ZKMetadataStore createLocalMetadataStore() {
+        return new ZKMetadataStore(mockZooKeeper);
+    }
+
+    protected ZKMetadataStore createConfigurationMetadataStore() {
+        return new ZKMetadataStore(mockZooKeeperGlobal);
+    }
+
     private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
         ServiceConfiguration configuration = spy(conf);
         Set<String> mockBrokerInterceptors = mock(Set.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
new file mode 100644
index 0000000..0045ddd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+
+    @Test
+    public void shouldElectOneLeader() {
+        int leaders = 0;
+        for (PulsarService broker : getAllBrokers()) {
+            if (broker.getLeaderElectionService().isLeader()) {
+                leaders++;
+            }
+        }
+        assertEquals(leaders, 1);
+    }
+
+    @Test
+    public void shouldAllBrokersKnowTheLeader() {
+        Awaitility.await().untilAsserted(() -> {
+            for (PulsarService broker : getAllBrokers()) {
+                Optional<LeaderBroker> currentLeader = broker.getLeaderElectionService().getCurrentLeader();
+                assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+            }
+        });
+    }
+
+    @Test
+    public void shouldAllBrokersBeAbleToGetTheLeader() {
+        Awaitility.await().untilAsserted(() -> {
+            LeaderBroker leader = null;
+            for (PulsarService broker : getAllBrokers()) {
+                Optional<LeaderBroker> currentLeader =
+                        broker.getLeaderElectionService().readCurrentLeader().get(1, TimeUnit.SECONDS);
+                assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+                if (leader != null) {
+                    assertEquals(currentLeader.get(), leader,
+                            "Different leader on broker " + broker.getBrokerServiceUrl());
+                } else {
+                    leader = currentLeader.get();
+                }
+            }
+        });
+    }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 29cb4f9..6599c62 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -111,7 +111,10 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
             } else {
                 return tryToBecomeLeader();
             }
-        });
+        }).thenCompose(leaderElectionState ->
+                // make sure that the cache contains the current leader
+                // so that getLeaderValueIfPresent works on all brokers
+                cache.get(path).thenApply(__ -> leaderElectionState));
     }
 
     private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index c7f677d..cd0c60c 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -19,32 +19,33 @@
 package org.apache.zookeeper;
 
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiPredicate;
-
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import org.apache.commons.lang3.tuple.Pair;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
@@ -53,7 +54,6 @@ import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.objenesis.Objenesis;
@@ -64,7 +64,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MockZooKeeper extends ZooKeeper {
-    private TreeMap<String, Pair<byte[], Integer>> tree;
+    @Data
+    @AllArgsConstructor
+    private static class MockZNode {
+        byte[] content;
+        int version;
+        long ephemeralOwner;
+
+        static MockZNode of(byte[] content, int version, long ephemeralOwner) {
+            return new MockZNode(content, version, ephemeralOwner);
+        }
+    }
+
+    private TreeMap<String, MockZNode> tree;
     private SetMultimap<String, Watcher> watchers;
     private volatile boolean stopped;
     private AtomicReference<KeeperException.Code> alwaysFail;
@@ -78,6 +90,7 @@ public class MockZooKeeper extends ZooKeeper {
     private ReentrantLock mutex;
 
     private AtomicLong sequentialIdGenerator;
+    private ThreadLocal<Long> epheralOwnerThreadLocal;
 
     //see details of Objenesis caching - http://objenesis.org/details.html
     //see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
@@ -85,9 +98,9 @@ public class MockZooKeeper extends ZooKeeper {
 
     public enum Op {
         CREATE, GET, SET, GET_CHILDREN, DELETE, EXISTS, SYNC,
-    };
+    }
 
-    private class Failure {
+    private static class Failure {
         final KeeperException.Code failReturnCode;
         final BiPredicate<Op, String> predicate;
 
@@ -121,14 +134,7 @@ public class MockZooKeeper extends ZooKeeper {
 
     public static MockZooKeeper newInstanceForGlobalZK(ExecutorService executor, int readOpDelayMs) {
         try {
-            ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator =
-                    new ObjenesisStd().getInstantiatorOf(MockZooKeeper.class);
-            MockZooKeeper zk = (MockZooKeeper) mockZooKeeperInstantiator.newInstance();
-            zk.init(executor);
-            zk.readOpDelayMs = readOpDelayMs;
-            zk.mutex = new ReentrantLock();
-            zk.sequentialIdGenerator =  new AtomicLong();
-            return zk;
+            return createMockZooKeeperInstance(executor, readOpDelayMs);
         } catch (RuntimeException e) {
             throw e;
         } catch (Exception e) {
@@ -138,14 +144,9 @@ public class MockZooKeeper extends ZooKeeper {
 
     public static MockZooKeeper newInstance(ExecutorService executor, int readOpDelayMs) {
         try {
-            ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator = objenesis.getInstantiatorOf(MockZooKeeper.class);
-            MockZooKeeper zk = (MockZooKeeper) mockZooKeeperInstantiator.newInstance();
-            zk.init(executor);
-            zk.readOpDelayMs = readOpDelayMs;
-            zk.mutex = new ReentrantLock();
+            MockZooKeeper zk = createMockZooKeeperInstance(executor, readOpDelayMs);
             ObjectInstantiator<ClientCnxn> clientCnxnObjectInstantiator = objenesis.getInstantiatorOf(ClientCnxn.class);
             Whitebox.setInternalState(zk, "cnxn", clientCnxnObjectInstantiator.newInstance());
-            zk.sequentialIdGenerator =  new AtomicLong();
             return zk;
         } catch (RuntimeException e) {
             throw e;
@@ -154,6 +155,19 @@ public class MockZooKeeper extends ZooKeeper {
         }
     }
 
+    private static MockZooKeeper createMockZooKeeperInstance(ExecutorService executor, int readOpDelayMs) {
+        ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator =
+                objenesis.getInstantiatorOf(MockZooKeeper.class);
+        MockZooKeeper zk = mockZooKeeperInstantiator.newInstance();
+        zk.epheralOwnerThreadLocal = new ThreadLocal<>();
+        zk.init(executor);
+        zk.readOpDelayMs = readOpDelayMs;
+        zk.mutex = new ReentrantLock();
+        zk.lockInstance = ThreadLocal.withInitial(zk::createLock);
+        zk.sequentialIdGenerator = new AtomicLong();
+        return zk;
+    }
+
     private void init(ExecutorService executor) {
         tree = Maps.newTreeMap();
         if (executor != null) {
@@ -176,7 +190,8 @@ public class MockZooKeeper extends ZooKeeper {
 
     private MockZooKeeper(String quorum) throws Exception {
         // This constructor is never called
-        super(quorum, 1, event -> {});
+        super(quorum, 1, event -> {
+        });
         assert false;
     }
 
@@ -185,27 +200,68 @@ public class MockZooKeeper extends ZooKeeper {
         return States.CONNECTED;
     }
 
+
+    @Slf4j
+    private static class SingleAcquireAndReleaseLock {
+        private final AtomicBoolean acquired = new AtomicBoolean(false);
+        private final Lock lock;
+
+        SingleAcquireAndReleaseLock(Lock lock) {
+            this.lock = lock;
+        }
+
+        public void lock() {
+            if (acquired.compareAndSet(false, true)) {
+                lock.lock();
+            } else {
+                throw new IllegalStateException("Lock was already acquired!");
+            }
+        }
+
+        public void unlockIfNeeded() {
+            if (acquired.compareAndSet(true, false)) {
+                lock.unlock();
+            }
+        }
+    }
+
+    private ThreadLocal<SingleAcquireAndReleaseLock> lockInstance;
+
+    private SingleAcquireAndReleaseLock createLock() {
+        return new SingleAcquireAndReleaseLock(mutex);
+    }
+
+    private void lock() {
+        lockInstance.get().lock();
+    }
+
+    private void unlockIfLocked() {
+        lockInstance.get().unlockIfNeeded();
+    }
+
     @Override
     public void register(Watcher watcher) {
-        mutex.lock();
+        lock();
         sessionWatcher = watcher;
-        mutex.unlock();
+        unlockIfLocked();
     }
 
     @Override
     public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
             throws KeeperException, InterruptedException {
-        mutex.lock();
-
         final Set<Watcher> toNotifyCreate = Sets.newHashSet();
         final Set<Watcher> toNotifyParent = Sets.newHashSet();
         final String parent = path.substring(0, path.lastIndexOf("/"));
 
+        lock();
         try {
+
+
             maybeThrowProgrammedFailure(Op.CREATE, path);
 
-            if (stopped)
+            if (stopped) {
                 throw new KeeperException.ConnectionLossException();
+            }
 
             if (tree.containsKey(path)) {
                 throw new KeeperException.NodeExistsException(path);
@@ -215,16 +271,17 @@ public class MockZooKeeper extends ZooKeeper {
                 throw new KeeperException.NoNodeException();
             }
 
-            if (createMode == CreateMode.EPHEMERAL_SEQUENTIAL || createMode == CreateMode.PERSISTENT_SEQUENTIAL) {
-                byte[] parentData = tree.get(parent).getLeft();
-                int parentVersion = tree.get(parent).getRight();
+            if (createMode.isSequential()) {
+                MockZNode parentNode = tree.get(parent);
+                int parentVersion = tree.get(parent).getVersion();
                 path = path + parentVersion;
 
                 // Update parent version
-                tree.put(parent, Pair.of(parentData, parentVersion + 1));
+                tree.put(parent,
+                        MockZNode.of(parentNode.getContent(), parentVersion + 1, parentNode.getEphemeralOwner()));
             }
 
-            tree.put(path, Pair.of(data, 0));
+            tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? getEphemeralOwner() : -1L));
 
             toNotifyCreate.addAll(watchers.get(path));
 
@@ -233,8 +290,7 @@ public class MockZooKeeper extends ZooKeeper {
             }
             watchers.removeAll(path);
         } finally {
-
-            mutex.unlock();
+            unlockIfLocked();
         }
 
         final String finalPath = path;
@@ -256,68 +312,90 @@ public class MockZooKeeper extends ZooKeeper {
         return path;
     }
 
+    protected long getEphemeralOwner() {
+        Long epheralOwner = epheralOwnerThreadLocal.get();
+        if (epheralOwner != null) {
+            return epheralOwner;
+        }
+        return getSessionId();
+    }
+
+    public void overrideEpheralOwner(long epheralOwner) {
+        epheralOwnerThreadLocal.set(epheralOwner);
+    }
+
+    public void removeEpheralOwnerOverride() {
+        epheralOwnerThreadLocal.remove();
+    }
+
     @Override
     public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
-            final StringCallback cb, final Object ctx) {
+                       final StringCallback cb, final Object ctx) {
 
 
         executor.execute(() -> {
-            mutex.lock();
+            lock();
+            try {
 
-            if (stopped) {
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
-                return;
-            }
+                if (stopped) {
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+                    return;
+                }
 
-            final Set<Watcher> toNotifyCreate = Sets.newHashSet();
-            toNotifyCreate.addAll(watchers.get(path));
+                final Set<Watcher> toNotifyCreate = Sets.newHashSet();
+                toNotifyCreate.addAll(watchers.get(path));
 
-            final Set<Watcher> toNotifyParent = Sets.newHashSet();
-            final String parent = path.substring(0, path.lastIndexOf("/"));
-            if (!parent.isEmpty()) {
-                toNotifyParent.addAll(watchers.get(parent));
-            }
+                final Set<Watcher> toNotifyParent = Sets.newHashSet();
+                final String parent = path.substring(0, path.lastIndexOf("/"));
+                if (!parent.isEmpty()) {
+                    toNotifyParent.addAll(watchers.get(parent));
+                }
 
-            final String name;
-            if (createMode != null && createMode.isSequential()) {
-                name = path + Long.toString(sequentialIdGenerator.getAndIncrement());
-            } else {
-                name = path;
-            }
+                final String name;
+                if (createMode != null && createMode.isSequential()) {
+                    name = path + sequentialIdGenerator.getAndIncrement();
+                } else {
+                    name = path;
+                }
 
-            Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx, null);
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
-            } else if (tree.containsKey(path)) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
-            } else if (!parent.isEmpty() && !tree.containsKey(parent)) {
-                mutex.unlock();
-                toNotifyParent.forEach(watcher -> watcher
-                        .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
-            } else {
-                tree.put(name, Pair.of(data, 0));
-                watchers.removeAll(name);
-                mutex.unlock();
-                cb.processResult(0, path, ctx, name);
-
-                triggerPersistentWatches(path, parent, EventType.NodeCreated);
-
-                toNotifyCreate.forEach(
-                        watcher -> watcher.process(
-                                new WatchedEvent(EventType.NodeCreated,
-                                                 KeeperState.SyncConnected,
-                                                 name)));
-                toNotifyParent.forEach(
-                        watcher -> watcher.process(
-                                new WatchedEvent(EventType.NodeChildrenChanged,
-                                                 KeeperState.SyncConnected,
-                                                 parent)));
+                Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
+                if (failure.isPresent()) {
+                    unlockIfLocked();
+                    cb.processResult(failure.get().intValue(), path, ctx, null);
+                } else if (stopped) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+                } else if (tree.containsKey(path)) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
+                } else if (!parent.isEmpty() && !tree.containsKey(parent)) {
+                    unlockIfLocked();
+                    toNotifyParent.forEach(watcher -> watcher
+                            .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
+                                    parent)));
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+                } else {
+                    tree.put(name, MockZNode.of(data, 0,
+                            createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : -1L));
+                    watchers.removeAll(name);
+                    unlockIfLocked();
+                    cb.processResult(0, path, ctx, name);
+
+                    triggerPersistentWatches(path, parent, EventType.NodeCreated);
+
+                    toNotifyCreate.forEach(
+                            watcher -> watcher.process(
+                                    new WatchedEvent(EventType.NodeCreated,
+                                            KeeperState.SyncConnected,
+                                            name)));
+                    toNotifyParent.forEach(
+                            watcher -> watcher.process(
+                                    new WatchedEvent(EventType.NodeChildrenChanged,
+                                            KeeperState.SyncConnected,
+                                            parent)));
+                }
+            } finally {
+                unlockIfLocked();
             }
         });
 
@@ -325,10 +403,10 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
-        mutex.lock();
+        lock();
         try {
             maybeThrowProgrammedFailure(Op.GET, path);
-            Pair<byte[], Integer> value = tree.get(path);
+            MockZNode value = tree.get(path);
             if (value == null) {
                 throw new KeeperException.NoNodeException(path);
             } else {
@@ -336,12 +414,12 @@ public class MockZooKeeper extends ZooKeeper {
                     watchers.put(path, watcher);
                 }
                 if (stat != null) {
-                    stat.setVersion(value.getRight());
+                    applyToStat(value, stat);
                 }
-                return value.getLeft();
+                return value.getContent();
             }
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
         }
     }
 
@@ -358,20 +436,18 @@ public class MockZooKeeper extends ZooKeeper {
                 return;
             }
 
-            Pair<byte[], Integer> value;
-            mutex.lock();
+            MockZNode value;
+            lock();
             try {
                 value = tree.get(path);
             } finally {
-                mutex.unlock();
+                unlockIfLocked();
             }
 
             if (value == null) {
                 cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
             } else {
-                Stat stat = new Stat();
-                stat.setVersion(value.getRight());
-                cb.processResult(0, path, ctx, value.getLeft(), stat);
+                cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
             }
         });
     }
@@ -380,31 +456,34 @@ public class MockZooKeeper extends ZooKeeper {
     public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
         executor.execute(() -> {
             checkReadOpDelay();
-            mutex.lock();
-            Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx, null, null);
-                return;
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
-                return;
-            }
-
-            Pair<byte[], Integer> value = tree.get(path);
-            if (value == null) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
-            } else {
-                if (watcher != null) {
-                    watchers.put(path, watcher);
+            lock();
+            try {
+                Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
+                if (failure.isPresent()) {
+                    unlockIfLocked();
+                    cb.processResult(failure.get().intValue(), path, ctx, null, null);
+                    return;
+                } else if (stopped) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
+                    return;
                 }
 
-                Stat stat = new Stat();
-                stat.setVersion(value.getRight());
-                mutex.unlock();
-                cb.processResult(0, path, ctx, value.getLeft(), stat);
+                MockZNode value = tree.get(path);
+                if (value == null) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
+                } else {
+                    if (watcher != null) {
+                        watchers.put(path, watcher);
+                    }
+
+                    Stat stat = createStatForZNode(value);
+                    unlockIfLocked();
+                    cb.processResult(0, path, ctx, value.getContent(), stat);
+                }
+            } finally {
+                unlockIfLocked();
             }
         });
     }
@@ -412,44 +491,47 @@ public class MockZooKeeper extends ZooKeeper {
     @Override
     public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) {
         executor.execute(() -> {
-            mutex.lock();
-            Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx, null);
-                return;
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
-                return;
-            }
+            lock();
+            List<String> children = Lists.newArrayList();
+            try {
+                Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
+                if (failure.isPresent()) {
+                    unlockIfLocked();
+                    cb.processResult(failure.get().intValue(), path, ctx, null);
+                    return;
+                } else if (stopped) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+                    return;
+                }
 
-            if (!tree.containsKey(path)) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
-                return;
-            }
+                if (!tree.containsKey(path)) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+                    return;
+                }
 
-            List<String> children = Lists.newArrayList();
-            for (String item : tree.tailMap(path).keySet()) {
-                if (!item.startsWith(path)) {
-                    break;
-                } else {
-                    if (path.length() >= item.length()) {
-                        continue;
-                    }
+                for (String item : tree.tailMap(path).keySet()) {
+                    if (!item.startsWith(path)) {
+                        break;
+                    } else {
+                        if (path.length() >= item.length()) {
+                            continue;
+                        }
 
-                    String child = item.substring(path.length() + 1);
-                    if (item.charAt(path.length()) == '/' && !child.contains("/")) {
-                        children.add(child);
+                        String child = item.substring(path.length() + 1);
+                        if (item.charAt(path.length()) == '/' && !child.contains("/")) {
+                            children.add(child);
+                        }
                     }
                 }
-            }
 
-            if (watcher != null) {
-                watchers.put(path, watcher);
+                if (watcher != null) {
+                    watchers.put(path, watcher);
+                }
+            } finally {
+                unlockIfLocked();
             }
-            mutex.unlock();
 
             cb.processResult(0, path, ctx, children);
         });
@@ -457,7 +539,7 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public List<String> getChildren(String path, Watcher watcher) throws KeeperException {
-        mutex.lock();
+        lock();
         try {
             maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
 
@@ -465,35 +547,31 @@ public class MockZooKeeper extends ZooKeeper {
                 throw new KeeperException.NoNodeException();
             }
 
-            List<String> children = Lists.newArrayList();
-            for (String item : tree.tailMap(path).keySet()) {
-                if (!item.startsWith(path)) {
-                    break;
-                } else {
-                    if (path.length() >= item.length()) {
-                        continue;
-                    }
+            String firstKey = path.equals("/") ? path : path + "/";
+            String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
 
-                    String child = item.substring(path.length() + 1);
-                    if (!child.contains("/")) {
-                        children.add(child);
-                    }
-                }
-            }
+            Set<String> children = new TreeSet<>();
+            tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+                String relativePath = key.replace(firstKey, "");
+
+                // Only return first-level children
+                String child = relativePath.split("/", 2)[0];
+                children.add(child);
+            });
 
             if (watcher != null) {
                 watchers.put(path, watcher);
             }
 
-            return children;
+            return new ArrayList<>(children);
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
         }
     }
 
     @Override
     public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
-        mutex.lock();
+        lock();
         try {
             maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
 
@@ -503,173 +581,153 @@ public class MockZooKeeper extends ZooKeeper {
                 throw new KeeperException.NoNodeException();
             }
 
-            List<String> children = Lists.newArrayList();
-            for (String item : tree.tailMap(path).keySet()) {
-                if (!item.startsWith(path)) {
-                    break;
-                } else {
-                    if (path.length() >= item.length()) {
-                        continue;
-                    }
-                    String child = item.substring(path.length());
-                    if (child.indexOf("/") == 0) {
-                        child = child.substring(1);
-                        log.debug("child: '{}'", child);
-                        if (!child.contains("/")) {
-                            children.add(child);
-                        }
-                    }
-                }
-            }
-            return children;
+            String firstKey = path.equals("/") ? path : path + "/";
+            String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
+
+            Set<String> children = new TreeSet<>();
+            tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+                String relativePath = key.replace(firstKey, "");
+
+                // Only return first-level children
+                String child = relativePath.split("/", 2)[0];
+                children.add(child);
+            });
+
+            return new ArrayList<>(children);
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
         }
     }
 
     @Override
     public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) {
         executor.execute(() -> {
-            mutex.lock();
+            Set<String> children = new TreeSet<>();
+            lock();
+            try {
+                Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
+                if (failure.isPresent()) {
+                    unlockIfLocked();
+                    cb.processResult(failure.get().intValue(), path, ctx, null, null);
+                    return;
+                } else if (stopped) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
+                    return;
+                } else if (!tree.containsKey(path)) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
+                    return;
+                }
 
-            Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx, null, null);
-                return;
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
-                return;
-            } else if (!tree.containsKey(path)) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
-                return;
-            }
+                String firstKey = path.equals("/") ? path : path + "/";
+                String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
 
-            log.debug("getChildren path={}", path);
-            List<String> children = Lists.newArrayList();
-            for (String item : tree.tailMap(path).keySet()) {
-                log.debug("Checking path {}", item);
-                if (!item.startsWith(path)) {
-                    break;
-                } else if (item.equals(path)) {
-                    continue;
-                } else {
-                    String child = item.substring(path.length());
-                    if (child.indexOf("/") == 0) {
-                        child = child.substring(1);
-                        log.debug("child: '{}'", child);
-                        if (!child.contains("/")) {
-                            children.add(child);
-                        }
-                    }
-                }
-            }
+                tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+                    String relativePath = key.replace(firstKey, "");
 
-            log.debug("getChildren done path={} result={}", path, children);
-            mutex.unlock();
-            cb.processResult(0, path, ctx, children, new Stat());
+                    // Only return first-level children
+                    String child = relativePath.split("/", 2)[0];
+                    children.add(child);
+                });
+            } finally {
+                unlockIfLocked();
+            }
+            cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
         });
 
     }
 
     @Override
     public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
-        mutex.lock();
+        lock();
         try {
             maybeThrowProgrammedFailure(Op.EXISTS, path);
 
-            if (stopped)
+            if (stopped) {
                 throw new KeeperException.ConnectionLossException();
+            }
 
             if (tree.containsKey(path)) {
-                Stat stat = new Stat();
-                stat.setVersion(tree.get(path).getRight());
-                return stat;
+                return createStatForZNode(tree.get(path));
             } else {
                 return null;
             }
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
+        }
+    }
+
+    private static Stat createStatForZNode(MockZNode zNode) {
+        return applyToStat(zNode, new Stat());
+    }
+
+    private static Stat applyToStat(MockZNode zNode, Stat stat) {
+        stat.setVersion(zNode.getVersion());
+        if (zNode.getEphemeralOwner() != -1L) {
+            stat.setEphemeralOwner(zNode.getEphemeralOwner());
         }
+        return stat;
     }
 
     @Override
     public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
-        mutex.lock();
+        lock();
         try {
             maybeThrowProgrammedFailure(Op.EXISTS, path);
 
-            if (stopped)
+            if (stopped) {
                 throw new KeeperException.ConnectionLossException();
+            }
 
             if (watcher != null) {
                 watchers.put(path, watcher);
             }
 
             if (tree.containsKey(path)) {
-                Stat stat = new Stat();
-                stat.setVersion(tree.get(path).getRight());
-                return stat;
+                return createStatForZNode(tree.get(path));
             } else {
                 return null;
             }
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
         }
     }
 
     @Override
     public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
-        executor.execute(() -> {
-            mutex.lock();
-            Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx, null);
-                return;
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
-                return;
-            }
-
-            if (tree.containsKey(path)) {
-                mutex.unlock();
-                cb.processResult(0, path, ctx, new Stat());
-            } else {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
-            }
-        });
+        exists(path, null, cb, ctx);
     }
 
     @Override
     public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
         executor.execute(() -> {
-            mutex.lock();
-            Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx, null);
-                return;
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
-                return;
-            }
+            lock();
+            try {
+                Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
+                if (failure.isPresent()) {
+                    unlockIfLocked();
+                    cb.processResult(failure.get().intValue(), path, ctx, null);
+                    return;
+                } else if (stopped) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+                    return;
+                }
 
-            if (watcher != null) {
-                watchers.put(path, watcher);
-            }
+                if (watcher != null) {
+                    watchers.put(path, watcher);
+                }
 
-            if (tree.containsKey(path)) {
-                mutex.unlock();
-                cb.processResult(0, path, ctx, new Stat());
-            } else {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+                if (tree.containsKey(path)) {
+                    unlockIfLocked();
+                    cb.processResult(0, path, ctx, new Stat());
+                } else {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+                }
+            } finally {
+                unlockIfLocked();
             }
         });
     }
@@ -693,11 +751,10 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public Stat setData(final String path, byte[] data, int version) throws KeeperException, InterruptedException {
-        mutex.lock();
-
         final Set<Watcher> toNotify = Sets.newHashSet();
-        int newVersion;
+        MockZNode newZNode;
 
+        lock();
         try {
             maybeThrowProgrammedFailure(Op.SET, path);
 
@@ -709,21 +766,22 @@ public class MockZooKeeper extends ZooKeeper {
                 throw new KeeperException.NoNodeException();
             }
 
-            int currentVersion = tree.get(path).getRight();
+            MockZNode mockZNode = tree.get(path);
+            int currentVersion = mockZNode.getVersion();
 
             // Check version
             if (version != -1 && version != currentVersion) {
                 throw new KeeperException.BadVersionException(path);
             }
 
-            newVersion = currentVersion + 1;
             log.debug("[{}] Updating -- current version: {}", path, currentVersion);
-            tree.put(path, Pair.of(data, newVersion));
+            newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
+            tree.put(path, newZNode);
 
             toNotify.addAll(watchers.get(path));
             watchers.removeAll(path);
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
         }
 
         executor.execute(() -> {
@@ -733,9 +791,7 @@ public class MockZooKeeper extends ZooKeeper {
                     .process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)));
         });
 
-        Stat stat = new Stat();
-        stat.setVersion(newVersion);
-        return stat;
+        return createStatForZNode(newZNode);
     }
 
     @Override
@@ -747,43 +803,45 @@ public class MockZooKeeper extends ZooKeeper {
 
         executor.execute(() -> {
             final Set<Watcher> toNotify = Sets.newHashSet();
+            Stat stat;
+            lock();
+            try {
 
-            mutex.lock();
+                Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
+                if (failure.isPresent()) {
+                    unlockIfLocked();
+                    cb.processResult(failure.get().intValue(), path, ctx, null);
+                    return;
+                } else if (stopped) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+                    return;
+                }
 
-            Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx, null);
-                return;
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
-                return;
-            }
+                if (!tree.containsKey(path)) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+                    return;
+                }
 
-            if (!tree.containsKey(path)) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
-                return;
-            }
+                MockZNode mockZNode = tree.get(path);
+                int currentVersion = mockZNode.getVersion();
 
-            int currentVersion = tree.get(path).getRight();
+                // Check version
+                if (version != -1 && version != currentVersion) {
+                    log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
+                    return;
+                }
 
-            // Check version
-            if (version != -1 && version != currentVersion) {
-                log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
-                return;
+                log.debug("[{}] Updating -- current version: {}", path, currentVersion);
+                MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
+                tree.put(path, newZNode);
+                stat = createStatForZNode(newZNode);
+            } finally {
+                unlockIfLocked();
             }
-
-            int newVersion = currentVersion + 1;
-            log.debug("[{}] Updating -- current version: {}", path, currentVersion);
-            tree.put(path, Pair.of(data, newVersion));
-            Stat stat = new Stat();
-            stat.setVersion(newVersion);
-
-            mutex.unlock();
             cb.processResult(0, path, ctx, stat);
 
             toNotify.addAll(watchers.get(path));
@@ -805,7 +863,7 @@ public class MockZooKeeper extends ZooKeeper {
         final Set<Watcher> toNotifyParent;
         final String parent;
 
-        mutex.lock();
+        lock();
         try {
             if (stopped) {
                 throw new KeeperException.ConnectionLossException();
@@ -816,7 +874,7 @@ public class MockZooKeeper extends ZooKeeper {
             }
 
             if (version != -1) {
-                int currentVersion = tree.get(path).getRight();
+                int currentVersion = tree.get(path).getVersion();
                 if (version != currentVersion) {
                     throw new KeeperException.BadVersionException(path);
                 }
@@ -835,7 +893,7 @@ public class MockZooKeeper extends ZooKeeper {
 
             watchers.removeAll(path);
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
         }
 
         executor.execute(() -> {
@@ -857,50 +915,55 @@ public class MockZooKeeper extends ZooKeeper {
     @Override
     public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
         Runnable r = () -> {
-            mutex.lock();
-            final Set<Watcher> toNotifyDelete = Sets.newHashSet();
-            toNotifyDelete.addAll(watchers.get(path));
-
-            final Set<Watcher> toNotifyParent = Sets.newHashSet();
-            final String parent = path.substring(0, path.lastIndexOf("/"));
-            if (!parent.isEmpty()) {
-                toNotifyParent.addAll(watchers.get(parent));
-            }
-            watchers.removeAll(path);
+            lock();
+            try {
+                final Set<Watcher> toNotifyDelete = Sets.newHashSet();
+                toNotifyDelete.addAll(watchers.get(path));
 
-            Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path);
-            if (failure.isPresent()) {
-                mutex.unlock();
-                cb.processResult(failure.get().intValue(), path, ctx);
-            } else if (stopped) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
-            } else if (!tree.containsKey(path)) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx);
-            } else if (hasChildren(path)) {
-                mutex.unlock();
-                cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx);
-            } else {
-                if (version != -1) {
-                    int currentVersion = tree.get(path).getRight();
-                    if (version != currentVersion) {
-                        mutex.unlock();
-                        cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx);
-                        return;
-                    }
+                final Set<Watcher> toNotifyParent = Sets.newHashSet();
+                final String parent = path.substring(0, path.lastIndexOf("/"));
+                if (!parent.isEmpty()) {
+                    toNotifyParent.addAll(watchers.get(parent));
                 }
+                watchers.removeAll(path);
+
+                Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path);
+                if (failure.isPresent()) {
+                    unlockIfLocked();
+                    cb.processResult(failure.get().intValue(), path, ctx);
+                } else if (stopped) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
+                } else if (!tree.containsKey(path)) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx);
+                } else if (hasChildren(path)) {
+                    unlockIfLocked();
+                    cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx);
+                } else {
+                    if (version != -1) {
+                        int currentVersion = tree.get(path).getVersion();
+                        if (version != currentVersion) {
+                            unlockIfLocked();
+                            cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx);
+                            return;
+                        }
+                    }
 
-                tree.remove(path);
+                    tree.remove(path);
 
-                mutex.unlock();
-                cb.processResult(0, path, ctx);
+                    unlockIfLocked();
+                    cb.processResult(0, path, ctx);
 
-                toNotifyDelete.forEach(watcher -> watcher
-                        .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
-                toNotifyParent.forEach(watcher -> watcher
-                        .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
-                triggerPersistentWatches(path, parent, EventType.NodeDeleted);
+                    toNotifyDelete.forEach(watcher -> watcher
+                            .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
+                    toNotifyParent.forEach(watcher -> watcher
+                            .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
+                                    parent)));
+                    triggerPersistentWatches(path, parent, EventType.NodeDeleted);
+                }
+            } finally {
+                unlockIfLocked();
             }
         };
 
@@ -908,7 +971,6 @@ public class MockZooKeeper extends ZooKeeper {
             executor.execute(r);
         } catch (RejectedExecutionException ree) {
             cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), path, ctx);
-            return;
         }
 
     }
@@ -917,30 +979,62 @@ public class MockZooKeeper extends ZooKeeper {
     public void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx) {
         try {
             List<OpResult> res = multi(ops);
-            cb.processResult(KeeperException.Code.OK.intValue(), (String)null, ctx, res);
+            cb.processResult(KeeperException.Code.OK.intValue(), null, ctx, res);
         } catch (Exception e) {
-            cb.processResult(KeeperException.Code.APIERROR.intValue(), (String)null, ctx, null);
+            cb.processResult(KeeperException.Code.APIERROR.intValue(), null, ctx, null);
         }
     }
 
     @Override
     public List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws InterruptedException, KeeperException {
         List<OpResult> res = new ArrayList<>();
-        for (org.apache.zookeeper.Op op : ops) {
-            switch (op.getType()) {
-                case ZooDefs.OpCode.create:
-                    this.create(op.getPath(), ((org.apache.zookeeper.Op.Create)op).data, null, null);
-                    res.add(new OpResult.CreateResult(op.getPath()));
-                    break;
-                case ZooDefs.OpCode.delete:
-                    this.delete(op.getPath(), -1);
-                    res.add(new OpResult.DeleteResult());
-                    break;
-                case ZooDefs.OpCode.setData:
-                    this.create(op.getPath(), ((org.apache.zookeeper.Op.Create)op).data, null, null);
-                    res.add(new OpResult.SetDataResult(null));
-                    break;
-                default:
+        try {
+            for (org.apache.zookeeper.Op op : ops) {
+                switch (op.getType()) {
+                    case ZooDefs.OpCode.create: {
+                        org.apache.zookeeper.Op.Create opc = ((org.apache.zookeeper.Op.Create) op);
+                        CreateMode cm = CreateMode.fromFlag(opc.flags);
+                        String path = this.create(op.getPath(), opc.data, null, cm);
+                        res.add(new OpResult.CreateResult(path));
+                        break;
+                    }
+                    case ZooDefs.OpCode.delete:
+                        this.delete(op.getPath(), Whitebox.getInternalState(op, "version"));
+                        res.add(new OpResult.DeleteResult());
+                        break;
+                    case ZooDefs.OpCode.setData: {
+                        Stat stat = this.setData(op.getPath(), Whitebox.getInternalState(op, "data"),
+                                Whitebox.getInternalState(op, "version"));
+                        res.add(new OpResult.SetDataResult(stat));
+                        break;
+                    }
+                    case ZooDefs.OpCode.getChildren: {
+                        try {
+                            List<String> children = this.getChildren(op.getPath(), null);
+                            res.add(new OpResult.GetChildrenResult(children));
+                        } catch (KeeperException e) {
+                            res.add(new OpResult.ErrorResult(e.code().intValue()));
+                        }
+                        break;
+                    }
+                    case ZooDefs.OpCode.getData: {
+                        Stat stat = new Stat();
+                        try {
+                            byte[] payload = this.getData(op.getPath(), null, stat);
+                            res.add(new OpResult.GetDataResult(payload, stat));
+                        } catch (KeeperException e) {
+                            res.add(new OpResult.ErrorResult(e.code().intValue()));
+                        }
+                        break;
+                    }
+                    default:
+                }
+            }
+        } catch (KeeperException e) {
+            res.add(new OpResult.ErrorResult(e.code().intValue()));
+            int total = Iterables.size(ops);
+            for (int i = res.size(); i < total; i++) {
+                res.add(new OpResult.ErrorResult(KeeperException.Code.RUNTIMEINCONSISTENCY.intValue()));
             }
         }
         return res;
@@ -972,7 +1066,7 @@ public class MockZooKeeper extends ZooKeeper {
     }
 
     public void shutdown() throws InterruptedException {
-        mutex.lock();
+        lock();
         try {
             stopped = true;
             tree.clear();
@@ -984,7 +1078,7 @@ public class MockZooKeeper extends ZooKeeper {
                 log.error("MockZooKeeper shutdown had error", ex);
             }
         } finally {
-            mutex.unlock();
+            unlockIfLocked();
         }
     }
 
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
index 499da0e..a33d448 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
@@ -44,7 +44,7 @@ public class MockZooKeeperSession extends ZooKeeper {
 
     private static final Objenesis objenesis = new ObjenesisStd();
 
-    private static final AtomicInteger sessionIdGenerator = new AtomicInteger(0);
+    private static final AtomicInteger sessionIdGenerator = new AtomicInteger(1000);
 
     public static MockZooKeeperSession newInstance(MockZooKeeper mockZooKeeper) {
         ObjectInstantiator<MockZooKeeperSession> instantiator = objenesis.getInstantiatorOf(MockZooKeeperSession.class);
@@ -80,13 +80,23 @@ public class MockZooKeeperSession extends ZooKeeper {
     @Override
     public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
             throws KeeperException, InterruptedException {
-        return mockZooKeeper.create(path, data, acl, createMode);
+        try {
+            mockZooKeeper.overrideEpheralOwner(getSessionId());
+            return mockZooKeeper.create(path, data, acl, createMode);
+        } finally {
+            mockZooKeeper.removeEpheralOwnerOverride();
+        }
     }
 
     @Override
     public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
                        final AsyncCallback.StringCallback cb, final Object ctx) {
-        mockZooKeeper.create(path, data, acl, createMode, cb, ctx);
+        try {
+            mockZooKeeper.overrideEpheralOwner(getSessionId());
+            mockZooKeeper.create(path, data, acl, createMode, cb, ctx);
+        } finally {
+            mockZooKeeper.removeEpheralOwnerOverride();
+        }
     }
 
     @Override