You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2019/09/11 08:27:41 UTC

[zookeeper] branch master updated: ZOOKEEPER-3145: Fix potential watch missing issue due to stale pzxid when replaying CloseSession txn with fuzzy snapshot

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 42ea26b  ZOOKEEPER-3145: Fix potential watch missing issue due to stale pzxid when replaying CloseSession txn with fuzzy snapshot
42ea26b is described below

commit 42ea26b75105484ef0504396332c276952224158
Author: Fangmin Lyu <al...@fb.com>
AuthorDate: Wed Sep 11 10:27:28 2019 +0200

    ZOOKEEPER-3145: Fix potential watch missing issue due to stale pzxid when replaying CloseSession txn with fuzzy snapshot
    
    Currently, the CloseSession txn is not idempotent, executing the CloseSession twice won't get the same result, which could cause pzxid inconsistent, which in turn cause watches missing.
    
    For more details, please check the description in ZOOKEEPER-3145.
    
    Author: Fangmin Lyu <al...@fb.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Andor Molnár <an...@apache.org>
    
    Closes #622 from lvfangmin/ZOOKEEPER-3145
---
 zookeeper-jute/src/main/resources/zookeeper.jute   |  11 ++-
 .../java/org/apache/zookeeper/server/DataTree.java |  61 +++++++++---
 .../zookeeper/server/PrepRequestProcessor.java     |  11 ++-
 .../apache/zookeeper/server/ZooKeeperServer.java   |  19 ++++
 .../zookeeper/server/util/SerializeUtils.java      |  10 +-
 .../zookeeper/server/PrepRequestProcessorTest.java |  53 ++++++++++-
 .../server/quorum/CloseSessionTxnTest.java         | 102 +++++++++++++++++++++
 .../server/quorum/FuzzySnapshotRelatedTest.java    |  60 +++++++-----
 8 files changed, 283 insertions(+), 44 deletions(-)

diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute
index 2bb0492..8310664 100644
--- a/zookeeper-jute/src/main/resources/zookeeper.jute
+++ b/zookeeper-jute/src/main/resources/zookeeper.jute
@@ -72,7 +72,7 @@ module org.apache.zookeeper.proto {
         vector<ustring>dataWatches;
         vector<ustring>existWatches;
         vector<ustring>childWatches;
-    }        
+    }
     class RequestHeader {
         int xid;
         int type;
@@ -92,12 +92,12 @@ module org.apache.zookeeper.proto {
         long zxid;
         int err;
     }
-    
-    class GetDataRequest {       
+
+    class GetDataRequest {
         ustring path;
         boolean watch;
     }
-    
+
     class SetDataRequest {
         ustring path;
         buffer data;
@@ -323,6 +323,9 @@ module org.apache.zookeeper.txn {
     class CreateSessionTxn {
         int timeOut;
     }
+    class CloseSessionTxn {
+        vector<ustring> paths2Delete;
+    }
     class ErrorTxn {
         int err;
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index 9d552f0..4657f8e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -63,6 +63,7 @@ import org.apache.zookeeper.server.watch.WatchesPathReport;
 import org.apache.zookeeper.server.watch.WatchesReport;
 import org.apache.zookeeper.server.watch.WatchesSummary;
 import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.CloseSessionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
 import org.apache.zookeeper.txn.CreateTxn;
@@ -947,7 +948,14 @@ public class DataTree {
                 rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
                 break;
             case OpCode.closeSession:
-                killSession(header.getClientId(), header.getZxid());
+                long sessionId = header.getClientId();
+                if (txn != null) {
+                    killSession(sessionId, header.getZxid(),
+                            ephemerals.remove(sessionId),
+                            ((CloseSessionTxn) txn).getPaths2Delete());
+                } else {
+                    killSession(sessionId, header.getZxid());
+                }
                 break;
             case OpCode.error:
                 ErrorTxn errTxn = (ErrorTxn) txn;
@@ -1119,20 +1127,45 @@ public class DataTree {
         // so there is no need for synchronization. The list is not
         // changed here. Only create and delete change the list which
         // are again called from FinalRequestProcessor in sequence.
-        Set<String> list = ephemerals.remove(session);
-        if (list != null) {
-            for (String path : list) {
-                try {
-                    deleteNode(path, zxid);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Deleting ephemeral node " + path + " for session 0x" + Long.toHexString(session));
-                    }
-                } catch (NoNodeException e) {
-                    LOG.warn("Ignoring NoNodeException for path "
-                             + path
-                             + " while removing ephemeral for dead session 0x"
-                             + Long.toHexString(session));
+        killSession(session, zxid, ephemerals.remove(session), null);
+    }
+
+    void killSession(long session, long zxid, Set<String> paths2DeleteLocal,
+            List<String> paths2DeleteInTxn) {
+        if (paths2DeleteInTxn != null) {
+            deleteNodes(session, zxid, paths2DeleteInTxn);
+        }
+
+        if (paths2DeleteLocal == null) {
+            return;
+        }
+
+        if (paths2DeleteInTxn != null) {
+            // explicitly check and remove to avoid potential performance
+            // issue when using removeAll
+            for (String path: paths2DeleteInTxn) {
+                paths2DeleteLocal.remove(path);
+            }
+            if (!paths2DeleteLocal.isEmpty()) {
+                LOG.warn("Unexpected extra paths under session {} which "
+                        + "are not in txn 0x{}", paths2DeleteLocal,
+                        Long.toHexString(zxid));
+            }
+        }
+
+        deleteNodes(session, zxid, paths2DeleteLocal);
+    }
+
+    void deleteNodes(long session, long zxid, Iterable<String> paths2Delete) {
+        for (String path : paths2Delete) {
+            try {
+                deleteNode(path, zxid);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Deleting ephemeral node {} for session 0x{}", path, Long.toHexString(session));
                 }
+            } catch (NoNodeException e) {
+                LOG.warn("Ignoring NoNodeException for path {} while removing ephemeral for dead session 0x{}",
+                        path, Long.toHexString(session));
             }
         }
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index cccecbf..8b2bab0 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -66,6 +66,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.CloseSessionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
@@ -532,8 +533,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             // this request is the last of the session so it should be ok
             //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             long startTime = Time.currentElapsedTime();
-            Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
             synchronized (zks.outstandingChanges) {
+                // need to move getEphemerals into zks.outstandingChanges
+                // synchronized block, otherwise there will be a race
+                // condition with the on flying deleteNode txn, and we'll
+                // delete the node again here, which is not correct
+                Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
                 for (ChangeRecord c : zks.outstandingChanges) {
                     if (c.stat == null) {
                         // Doing a delete
@@ -545,7 +550,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                 for (String path2Delete : es) {
                     addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null));
                 }
-
+                if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
+                    request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
+                }
                 zks.sessionTracker.setSessionClosing(request.sessionId);
             }
             ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index b8e0bd2..e982cd1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -110,6 +110,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
     private static boolean digestEnabled;
 
+    // Add a enable/disable option for now, we should remove this one when
+    // this feature is confirmed to be stable
+    public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
+    private static boolean closeSessionTxnEnabled = true;
+
     static {
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
 
@@ -127,6 +132,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
         digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
         LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
+
+        closeSessionTxnEnabled = Boolean.parseBoolean(
+                System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
+        LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+    }
+
+    public static boolean isCloseSessionTxnEnabled() {
+        return closeSessionTxnEnabled;
+    }
+
+    public static void setCloseSessionTxnEnabled(boolean enabled) {
+        ZooKeeperServer.closeSessionTxnEnabled = enabled;
+        LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
+                ZooKeeperServer.closeSessionTxnEnabled);
     }
 
     protected ZooKeeperServerBean jmxServerBean;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java
index 25e86a0..2454c43 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java
@@ -34,7 +34,9 @@ import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.IOUtils;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.CloseSessionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
@@ -67,7 +69,9 @@ public class SerializeUtils {
             txn = new CreateSessionTxn();
             break;
         case OpCode.closeSession:
-            return null;
+            txn = ZooKeeperServer.isCloseSessionTxnEnabled()
+                    ?  new CloseSessionTxn() : null;
+            break;
         case OpCode.create:
         case OpCode.create2:
             txn = new CreateTxn();
@@ -115,6 +119,10 @@ public class SerializeUtils {
                     create.setAcl(createv0.getAcl());
                     create.setEphemeral(createv0.getEphemeral());
                     create.setParentCVersion(-1);
+                } else if (hdr.getType() == OpCode.closeSession) {
+                    // perhaps this is before CloseSessionTxn was added,
+                    // ignore it and reset txn to null
+                    txn = null;
                 } else {
                     throw e;
                 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 9724423..264601d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -45,6 +45,7 @@ import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.test.ClientBase;
@@ -103,6 +104,10 @@ public class PrepRequestProcessorTest extends ClientBase {
     }
 
     private Request createRequest(Record record, int opCode) throws IOException {
+        return createRequest(record, opCode, 1L);
+    }
+
+    private Request createRequest(Record record, int opCode, long sessionId) throws IOException {
         // encoding
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
@@ -110,7 +115,7 @@ public class PrepRequestProcessorTest extends ClientBase {
         baos.close();
         // Id
         List<Id> ids = Arrays.asList(Ids.ANYONE_ID_UNSAFE);
-        return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids);
+        return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids);
     }
 
     private void process(List<Op> ops) throws Exception {
@@ -174,6 +179,52 @@ public class PrepRequestProcessorTest extends ClientBase {
     }
 
     /**
+     * Test ephemerals are deleted when the session is closed with
+     * the newly added CloseSessionTxn in ZOOKEEPER-3145.
+     */
+    @Test
+    public void testCloseSessionTxn() throws Exception {
+        boolean before = ZooKeeperServer.isCloseSessionTxnEnabled();
+
+        ZooKeeperServer.setCloseSessionTxnEnabled(true);
+        try {
+            // create a few ephemerals
+            long ephemeralOwner = 1;
+            DataTree dt = zks.getZKDatabase().dataTree;
+            dt.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0);
+            dt.createNode("/bar", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0);
+
+            // close session
+            RequestHeader header = new RequestHeader();
+            header.setType(OpCode.closeSession);
+
+            final FinalRequestProcessor frq = new FinalRequestProcessor(zks);
+            final CountDownLatch latch = new CountDownLatch(1);
+            processor = new PrepRequestProcessor(zks, new RequestProcessor() {
+                @Override
+                public void processRequest(Request request) {
+                    frq.processRequest(request);
+                    latch.countDown();
+                }
+
+                @Override
+                public void shutdown() {
+                    // TODO Auto-generated method stub
+                }
+            });
+            processor.pRequest(createRequest(header, OpCode.closeSession, ephemeralOwner));
+
+            assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+            // assert ephemerals are deleted
+            assertEquals(null, dt.getNode("/foo"));
+            assertEquals(null, dt.getNode("/bar"));
+        } finally {
+            ZooKeeperServer.setCloseSessionTxnEnabled(before);
+        }
+    }
+
+    /**
      * It tests that PrepRequestProcessor will return BadArgument KeeperException
      * if the request path (if it exists) is not valid, e.g. empty string.
      */
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java
new file mode 100644
index 0000000..ac9665b
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CloseSessionTxnTest extends QuorumPeerTestBase {
+
+    /**
+     * Test leader/leader compatibility with/without CloseSessionTxn, so that
+     * we can gradually rollout this code and rollback if there is problem.
+     */
+    @Test
+    public void testCloseSessionTxnCompatile() throws Exception {
+        // Test 4 cases:
+        // 1. leader disabled, follower disabled
+        testCloseSessionWithDifferentConfig(false, false);
+
+        // 2. leader disabled, follower enabled
+        testCloseSessionWithDifferentConfig(false, true);
+
+        // 3. leader enabled, follower disabled
+        testCloseSessionWithDifferentConfig(true, false);
+
+        // 4. leader enabled, follower enabled
+        testCloseSessionWithDifferentConfig(true, true);
+    }
+
+    private void testCloseSessionWithDifferentConfig(
+            boolean closeSessionEnabledOnLeader,
+            boolean closeSessionEnabledOnFollower) throws Exception {
+        // 1. set up an ensemble with 3 servers
+        final int numServers = 3;
+        servers = LaunchServers(numServers);
+        int leaderId = servers.findLeader();
+        ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnLeader);
+
+        // 2. shutdown one of the follower, start it later to pick up the
+        // CloseSessionTxnEnabled config change
+        //
+        // We cannot use different static config in the same JVM, so have to
+        // use this tricky
+        int followerA = (leaderId + 1) % numServers;
+        servers.mt[followerA].shutdown();
+        waitForOne(servers.zk[followerA], States.CONNECTING);
+
+        // 3. create an ephemeral node
+        String path = "/testCloseSessionTxnCompatile";
+        servers.zk[leaderId].create(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+
+        // 3. close the client
+        servers.restartClient(leaderId, this);
+        waitForOne(servers.zk[leaderId], States.CONNECTED);
+
+        // 4. update the CloseSessionTxnEnabled config before follower A
+        // started
+        System.setProperty("zookeeper.retainZKDatabase", "true");
+        ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnFollower);
+
+        // 5. restart follower A
+        servers.mt[followerA].start();
+        waitForOne(servers.zk[followerA], States.CONNECTED);
+
+        // 4. verify the ephemeral node is gone
+        for (int i = 0; i < numServers; i++) {
+            final CountDownLatch syncedLatch = new CountDownLatch(1);
+            servers.zk[i].sync(path, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    syncedLatch.countDown();
+                }
+            }, null);
+            Assert.assertTrue(syncedLatch.await(3, TimeUnit.SECONDS));
+            Assert.assertNull(servers.zk[i].exists(path, false));
+        }
+    }
+ }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
index 5cd8259..9003b3d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
@@ -125,7 +125,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
     public void testMultiOpConsistency() throws Exception {
         LOG.info("Create a parent node");
         final String path = "/testMultiOpConsistency";
-        createEmptyNode(zk[followerA], path);
+        createEmptyNode(zk[followerA], path, CreateMode.PERSISTENT);
 
         LOG.info("Hook to catch the 2nd sub create node txn in multi-op");
         CustomDataTree dt = (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();
@@ -175,8 +175,10 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
         final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode";
         final String child = parent + "/child";
-        createEmptyNode(zk[leaderId], parent);
-        createEmptyNode(zk[leaderId], child);
+        createEmptyNode(zk[leaderId], parent, CreateMode.PERSISTENT);
+        createEmptyNode(zk[leaderId], child, CreateMode.EPHEMERAL);
+        // create another child to test closeSession
+        createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL);
 
         LOG.info("shutdown follower {}", followerA);
         mt[followerA].shutdown();
@@ -205,8 +207,10 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
         final String parent = "/testPZxidUpdatedDuringTakingSnapshot";
         final String child = parent + "/child";
-        createEmptyNode(zk[followerA], parent);
-        createEmptyNode(zk[followerA], child);
+        createEmptyNode(zk[followerA], parent, CreateMode.PERSISTENT);
+        createEmptyNode(zk[followerA], child, CreateMode.EPHEMERAL);
+        // create another child to test closeSession
+        createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL);
 
         LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
         addSerializeListener(followerA, parent, child);
@@ -217,8 +221,12 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
         LOG.info("Restarting follower A to load snapshot");
         mt[followerA].shutdown();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CLOSED);
         mt[followerA].start();
+        // zk[followerA] will be closed in addSerializeListener, re-create it
+        zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA],
+                ClientBase.CONNECTION_TIMEOUT, this);
+
         QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
 
         LOG.info("Check and make sure the pzxid of the parent is the same " + "on leader and follower A");
@@ -226,13 +234,14 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
     }
 
     private void addSerializeListener(int sid, String parent, String child) {
-        final ZooKeeper zkClient = zk[followerA];
+        final ZooKeeper zkClient = zk[sid];
         CustomDataTree dt = (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree();
         dt.addListener(parent, new NodeSerializeListener() {
             @Override
             public void nodeSerialized(String path) {
                 try {
                     zkClient.delete(child, -1);
+                    zkClient.close();
                     LOG.info("Deleted the child node after the parent is serialized");
                 } catch (Exception e) {
                     LOG.error("Error when deleting node {}", e);
@@ -242,13 +251,26 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
     }
 
     private void compareStat(String path, int sid, int compareWithSid) throws Exception {
-        Stat stat1 = new Stat();
-        zk[sid].getData(path, null, stat1);
-
-        Stat stat2 = new Stat();
-        zk[compareWithSid].getData(path, null, stat2);
-
-        assertEquals(stat1, stat2);
+        ZooKeeper[] compareZk = new ZooKeeper[2];
+        compareZk[0] = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
+                ClientBase.CONNECTION_TIMEOUT, this);
+        compareZk[1] = new ZooKeeper("127.0.0.1:" + clientPorts[compareWithSid],
+                ClientBase.CONNECTION_TIMEOUT, this);
+        QuorumPeerMainTest.waitForAll(compareZk, States.CONNECTED);
+
+        try {
+            Stat stat1 = new Stat();
+            compareZk[0].getData(path, null, stat1);
+
+            Stat stat2 = new Stat();
+            compareZk[1].getData(path, null, stat2);
+
+            assertEquals(stat1, stat2);
+        } finally {
+            for (ZooKeeper z: compareZk) {
+                z.close();
+            }
+        }
     }
 
     @Test
@@ -286,19 +308,13 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
         LOG.info("Make sure the global sessions are consistent with leader");
 
         Map<Long, Integer> globalSessionsOnLeader = mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
-        if (mt[followerA].main.quorumPeer == null) {
-            LOG.info("quorumPeer is null");
-        }
-        if (mt[followerA].main.quorumPeer.getZkDb() == null) {
-            LOG.info("zkDb is null");
-        }
         Map<Long, Integer> globalSessionsOnFollowerA = mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
         LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(), globalSessionsOnFollowerA.keySet());
         assertTrue(globalSessionsOnFollowerA.keySet().containsAll(globalSessionsOnLeader.keySet()));
     }
 
-    private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
-        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    private void createEmptyNode(ZooKeeper zk, String path, CreateMode mode) throws Exception {
+        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, mode);
     }
 
     interface NodeCreateListener {