You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/09/14 22:08:32 UTC

zookeeper git commit: ZOOKEEPER-3144: Fix potential ephemeral nodes inconsistent due to global session inconsistent with fuzzy snapshot

Repository: zookeeper
Updated Branches:
  refs/heads/master 716300812 -> b58791016


ZOOKEEPER-3144: Fix potential ephemeral nodes inconsistent due to global session inconsistent with fuzzy snapshot

There is a race condition between update the lastProcessedZxid and the actual session change in DataTree, which could cause global session inconsistent, which then could cause ephemeral inconsistent.

For more details, please check the description in JIRA ZOOKEEPER-3144.

Author: Fangmin Lyu <al...@fb.com>

Reviewers: Michael Han <ha...@apache.org>

Closes #621 from lvfangmin/ZOOKEEPER-3144


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/b5879101
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/b5879101
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/b5879101

Branch: refs/heads/master
Commit: b58791016424e662c816e2253de96f3771f5d301
Parents: 7163008
Author: Fangmin Lyu <al...@fb.com>
Authored: Fri Sep 14 15:08:19 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Fri Sep 14 15:08:19 2018 -0700

----------------------------------------------------------------------
 .../zookeeper/server/ZooKeeperServer.java       | 28 +++---
 .../server/quorum/FuzzySnapshotRelatedTest.java | 89 +++++++++++++++++++-
 2 files changed, 103 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b5879101/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index 09c6a8a..70cb75b 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -268,21 +268,21 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      */
     public void loadData() throws IOException, InterruptedException {
         /*
-         * When a new leader starts executing Leader#lead, it 
+         * When a new leader starts executing Leader#lead, it
          * invokes this method. The database, however, has been
          * initialized before running leader election so that
          * the server could pick its zxid for its initial vote.
          * It does it by invoking QuorumPeer#getLastLoggedZxid.
          * Consequently, we don't need to initialize it once more
-         * and avoid the penalty of loading it a second time. Not 
+         * and avoid the penalty of loading it a second time. Not
          * reloading it is particularly important for applications
          * that host a large database.
-         * 
+         *
          * The following if block checks whether the database has
          * been initialized or not. Note that this method is
-         * invoked by at least one other method: 
+         * invoked by at least one other method:
          * ZooKeeperServer#startdata.
-         *  
+         *
          * See ZOOKEEPER-1642 for more detail.
          */
         if(zkDb.isInitialized()){
@@ -291,7 +291,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         else {
             setZxid(zkDb.loadDataBase());
         }
-        
+
         // Clean up dead sessions
         List<Long> deadSessions = new LinkedList<Long>();
         for (Long session : zkDb.getSessions()) {
@@ -364,7 +364,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public SessionTracker getSessionTracker() {
         return sessionTracker;
     }
-    
+
     long getNextZxid() {
         return hzxid.incrementAndGet();
     }
@@ -1181,7 +1181,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                     String authorizationID = saslServer.getAuthorizationID();
                     LOG.info("adding SASL authorization for authorizationID: " + authorizationID);
                     cnxn.addAuthInfo(new Id("sasl",authorizationID));
-                    if (System.getProperty("zookeeper.superUser") != null && 
+                    if (System.getProperty("zookeeper.superUser") != null &&
                         authorizationID.equals(System.getProperty("zookeeper.superUser"))) {
                         cnxn.addAuthInfo(new Id("super", ""));
                     }
@@ -1224,11 +1224,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         ProcessTxnResult rc;
         int opCode = request != null ? request.type : hdr.getType();
         long sessionId = request != null ? request.sessionId : hdr.getClientId();
-        if (hdr != null) {
-            rc = getZKDatabase().processTxn(hdr, txn);
-        } else {
-            rc = new ProcessTxnResult();
-        }
+
         if (opCode == OpCode.createSession) {
             if (hdr != null && txn instanceof CreateSessionTxn) {
                 CreateSessionTxn cst = (CreateSessionTxn) txn;
@@ -1241,6 +1237,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         } else if (opCode == OpCode.closeSession) {
             sessionTracker.removeSession(sessionId);
         }
+
+        if (hdr != null) {
+            rc = getZKDatabase().processTxn(hdr, txn);
+        } else {
+            rc = new ProcessTxnResult();
+        }
         return rc;
     }
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b5879101/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
index 8abab52..c4ca8a8 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.security.sasl.SaslException;
 
 import org.apache.jute.OutputArchive;
@@ -42,6 +43,7 @@ import org.apache.zookeeper.server.DataNode;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.test.ClientBase;
 
 import org.junit.Assert;
@@ -60,6 +62,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
     MainThread[] mt = null;
     ZooKeeper[] zk = null;
+    int[] clientPorts = null;
     int leaderId;
     int followerA;
 
@@ -67,7 +70,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
     public void setup() throws Exception {
         LOG.info("Start up a 3 server quorum");
         final int ENSEMBLE_SERVERS = 3;
-        final int clientPorts[] = new int[ENSEMBLE_SERVERS];
+        clientPorts = new int[ENSEMBLE_SERVERS];
         StringBuilder sb = new StringBuilder();
         String server;
 
@@ -259,6 +262,55 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
         Assert.assertEquals(stat1, stat2);
     }
 
+    @Test
+    public void testGlobalSessionConsistency() throws Exception {
+        LOG.info("Hook to catch the commitSession event on followerA");
+        CustomizedQPMain followerAMain = (CustomizedQPMain) mt[followerA].main;
+        final ZooKeeperServer zkServer = followerAMain.quorumPeer.getActiveServer();
+
+        // only take snapshot for the next global session we're going to create
+        final AtomicBoolean shouldTakeSnapshot = new AtomicBoolean(true);
+        followerAMain.setCommitSessionListener(new CommitSessionListener() {
+            @Override
+            public void process(long sessionId) {
+                LOG.info("Take snapshot");
+                if (shouldTakeSnapshot.getAndSet(false)) {
+                    zkServer.takeSnapshot(true);
+                }
+            }
+        });
+
+        LOG.info("Create a global session");
+        ZooKeeper globalClient = new ZooKeeper(
+                "127.0.0.1:" + clientPorts[followerA],
+                ClientBase.CONNECTION_TIMEOUT, this);
+        QuorumPeerMainTest.waitForOne(globalClient, States.CONNECTED);
+
+        LOG.info("Restart followerA to load the data from disk");
+        mt[followerA].shutdown();
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
+
+        mt[followerA].start();
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
+
+        LOG.info("Make sure the global sessions are consistent with leader");
+
+        Map<Long, Integer> globalSessionsOnLeader =
+                mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
+        if (mt[followerA].main.quorumPeer == null) {
+            LOG.info("quorumPeer is null");
+        }
+        if (mt[followerA].main.quorumPeer.getZkDb() == null) {
+            LOG.info("zkDb is null");
+        }
+        Map<Long, Integer> globalSessionsOnFollowerA =
+                mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
+        LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(),
+                globalSessionsOnFollowerA.keySet());
+        Assert.assertTrue(globalSessionsOnFollowerA.keySet().containsAll(
+                  globalSessionsOnLeader.keySet()));
+    }
+
     private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
         zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
@@ -310,7 +362,17 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
         public void nodeSerialized(String path);
     }
 
+    static interface CommitSessionListener {
+        public void process(long sessionId);
+    }
+
     static class CustomizedQPMain extends TestQPMain {
+        CommitSessionListener commitSessionListener;
+
+        public void setCommitSessionListener(CommitSessionListener listener) {
+            this.commitSessionListener = listener;
+        }
+
         @Override
         protected QuorumPeer getQuorumPeer() throws SaslException {
             return new QuorumPeer() {
@@ -323,6 +385,31 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
                         }
                     });
                 }
+
+                @Override
+                protected Follower makeFollower(FileTxnSnapLog logFactory)
+                        throws IOException {
+                    return new Follower(this, new FollowerZooKeeperServer(
+                            logFactory, this, this.getZkDb()) {
+                        @Override
+                        public void createSessionTracker() {
+                            sessionTracker = new LearnerSessionTracker(
+                                    this, getZKDatabase().getSessionWithTimeOuts(),
+                                    this.tickTime, self.getId(),
+                                    self.areLocalSessionsEnabled(),
+                                    getZooKeeperServerListener()) {
+
+                                public synchronized boolean commitSession(
+                                        long sessionId, int sessionTimeout) {
+                                    if (commitSessionListener != null) {
+                                        commitSessionListener.process(sessionId);
+                                    }
+                                    return super.commitSession(sessionId, sessionTimeout);
+                                }
+                            };
+                        }
+                    });
+                }
             };
         }
     }