You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2018/11/06 18:33:02 UTC

zookeeper git commit: ZOOKEEPER-3104: Fix potential data inconsistency due to NEWLEADER packet being sent too early during SNAP sync

Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.5 5701ddf19 -> 427f13678


ZOOKEEPER-3104: Fix potential data inconsistency due to NEWLEADER packet being sent too early during SNAP sync

Port this fix from master to 3.5.

Author: Fangmin Lyu <al...@fb.com>

Reviewers: andor@apache.org

Closes #685 from lvfangmin/ZOOKEEPER-3104-3.5


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/427f1367
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/427f1367
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/427f1367

Branch: refs/heads/branch-3.5
Commit: 427f136783c4a855cc370eb4c3861a3979ea968c
Parents: 5701ddf
Author: Fangmin Lyu <al...@fb.com>
Authored: Tue Nov 6 10:32:58 2018 -0800
Committer: Andor Molnar <an...@apache.org>
Committed: Tue Nov 6 10:32:58 2018 -0800

----------------------------------------------------------------------
 .../apache/zookeeper/server/quorum/Leader.java  |  11 +-
 .../zookeeper/server/quorum/LearnerHandler.java |  37 ++-
 .../server/quorum/QuorumPeerMainTest.java       | 298 +++++++++++++++++++
 3 files changed, 328 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/427f1367/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index a56c0f5..4712580 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -89,8 +89,7 @@ public class Leader {
         LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + maxConcurrentSnapshotTimeout);
     }
 
-    private final LearnerSnapshotThrottler learnerSnapshotThrottler = 
-        new LearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
+    private final LearnerSnapshotThrottler learnerSnapshotThrottler;
 
     final LeaderZooKeeperServer zk;
 
@@ -112,6 +111,12 @@ public class Leader {
         return proposalStats;
     }
 
+    public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
+            int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
+        return new LearnerSnapshotThrottler(
+                maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
+    }
+
     /**
      * Returns a copy of the current learner snapshot
      */
@@ -266,6 +271,8 @@ public class Leader {
             throw e;
         }
         this.zk = zk;
+        this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
+                maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/427f1367/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index f6c68b0..83d7aad 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -188,6 +188,11 @@ public class LearnerHandler extends ZooKeeperThread {
         this.leader = leader;
         this.bufferedInput = bufferedInput;
 
+        if (Boolean.getBoolean(FORCE_SNAP_SYNC)) {
+            forceSnapSync = true;
+            LOG.info("Forcing snapshot sync is enabled");
+        }
+
         try {
             if (leader.self != null) {
                 leader.self.authServer.authenticate(sock,
@@ -445,22 +450,6 @@ public class LearnerHandler extends ZooKeeperThread {
             // startForwarding() will be called in all cases
             boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
             
-            LOG.debug("Sending NEWLEADER message to " + sid);
-            // the version of this quorumVerifier will be set by leader.lead() in case
-            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
-            // we got here, so the version was set
-            if (getVersion() < 0x10000) {
-                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                        newLeaderZxid, null, null);
-                oa.writeRecord(newLeaderQP, "packet");
-            } else {
-                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
-                                .toString().getBytes(), null);
-                queuedPackets.add(newLeaderQP);
-            }
-            bufferedOutput.flush();
-
             /* if we are not truncating or sending a diff just send a snapshot */
             if (needSnap) {
                 boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
@@ -488,6 +477,22 @@ public class LearnerHandler extends ZooKeeperThread {
                 }
             }
 
+            LOG.debug("Sending NEWLEADER message to " + sid);
+            // the version of this quorumVerifier will be set by leader.lead() in case
+            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
+            // we got here, so the version was set
+            if (getVersion() < 0x10000) {
+                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                        newLeaderZxid, null, null);
+                oa.writeRecord(newLeaderQP, "packet");
+            } else {
+                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
+                                .toString().getBytes(), null);
+                queuedPackets.add(newLeaderQP);
+            }
+            bufferedOutput.flush();
+
             // Start thread that blast packets in the queue to learner
             startSendingPackets();
             

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/427f1367/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 5a54bc1..57d3df8 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -36,14 +36,18 @@ import java.nio.channels.SocketChannel;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
+import javax.security.sasl.SaslException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.log4j.WriterAppender;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
@@ -53,6 +57,8 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.test.ClientBase;
@@ -1195,4 +1201,296 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         }
         return null;
     }
+
+    /**
+     * Currently, in SNAP sync, the leader will start queuing the
+     * proposal/commits and the NEWLEADER packet before sending
+     * over the snapshot over wire. So it's possible that the zxid
+     * associated with the snapshot might be higher than all the
+     * packets queued before NEWLEADER.
+     *
+     * When the follower received the snapshot, it will apply all
+     * the txns queued before NEWLEADER, which may not cover all
+     * the txns up to the zxid in the snapshot. After that, it
+     * will write the snapshot out to disk with the zxid associated
+     * with the snapshot. In case the server crashed after writing
+     * this out, when loading the data from disk, it will use zxid
+     * of the snapshot file to sync with leader, and it could cause
+     * data inconsistent, because we only replayed partial of the
+     * historical data during previous syncing.
+     *
+     * This test case is going to cover and simulate this scenario
+     * and make sure there is no data inconsistency issue after fix.
+     */
+    @Test
+    public void testInconsistentDueToNewLeaderOrder() throws Exception {
+
+        // 1. set up an ensemble with 3 servers
+        final int ENSEMBLE_SERVERS = 3;
+        final int clientPorts[] = new int[ENSEMBLE_SERVERS];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
+                    + clientPorts[i];
+            sb.append(server + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+
+        // start servers
+        MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+        ZooKeeper zk[] = new ZooKeeper[ENSEMBLE_SERVERS];
+        Context contexts[] = new Context[ENSEMBLE_SERVERS];
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            final Context context = new Context();
+            contexts[i] = context;
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+                    false) {
+                @Override
+                public TestQPMain getTestQPMain() {
+                    return new CustomizedQPMain(context);
+                }
+            };
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+        }
+        waitForAll(zk, States.CONNECTED);
+        LOG.info("all servers started");
+
+        String nodePath = "/testInconsistentDueToNewLeader";
+
+        int leaderId = -1;
+        int followerA = -1;
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            if (mt[i].main.quorumPeer.leader != null) {
+                leaderId = i;
+            } else if (followerA == -1) {
+                followerA = i;
+            }
+        }
+        LOG.info("shutdown follower {}", followerA);
+        mt[followerA].shutdown();
+        waitForOne(zk[followerA], States.CONNECTING);
+
+        try {
+            // 2. set force snapshot to be true
+            LOG.info("force snapshot sync");
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+            // 3. create a node
+            String initialValue = "1";
+            final ZooKeeper leaderZk = zk[leaderId];
+            leaderZk.create(nodePath, initialValue.getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            LOG.info("created node {} with value {}", nodePath, initialValue);
+
+            CustomQuorumPeer leaderQuorumPeer =
+                    (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
+
+            // 4. on the customized leader catch the startForwarding call
+            //    (without synchronized), set the node to value v1, then
+            //    call the super.startForwarding to generate the ongoing
+            //    txn proposal and commit for v1 value update
+            leaderQuorumPeer.setStartForwardingListener(
+                    new StartForwardingListener() {
+                @Override
+                public void start() {
+                    if (!Boolean.getBoolean(LearnerHandler.FORCE_SNAP_SYNC)) {
+                        return;
+                    }
+                    final String value = "2";
+                    LOG.info("start forwarding, set {} to {}", nodePath, value);
+                    // use async, otherwise it will block the logLock in
+                    // ZKDatabase and the setData request will timeout
+                    try {
+                        leaderZk.setData(nodePath, value.getBytes(), -1,
+                                new AsyncCallback.StatCallback() {
+                            public void processResult(int rc, String path,
+                                   Object ctx, Stat stat) {}
+                        }, null);
+                        // wait for the setData txn being populated
+                        Thread.sleep(1000);
+                    } catch (Exception e) {
+                        LOG.error("error when set {} to {}", nodePath, value, e);
+                    }
+                }
+            });
+
+            // 5. on the customized leader catch the beginSnapshot call in
+            //    LearnerSnapshotThrottler to set the node to value v2,
+            //    wait it hit data tree
+            leaderQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener() {
+                @Override
+                public void start() {
+                    String value = "3";
+                    LOG.info("before sending snapshot, set {} to {}",
+                            nodePath, value);
+                    try {
+                        leaderZk.setData(nodePath, value.getBytes(), -1);
+                        LOG.info("successfully set {} to {}", nodePath, value);
+                    } catch (Exception e) {
+                        LOG.error("error when set {} to {}, {}", nodePath, value, e);
+                    }
+                }
+            });
+
+            // 6. exit follower A after taking snapshot
+            CustomQuorumPeer followerAQuorumPeer =
+                    ((CustomQuorumPeer) mt[followerA].main.quorumPeer);
+            LOG.info("set exit when ack new leader packet on {}", followerA);
+            contexts[followerA].exitWhenAckNewLeader = true;
+            CountDownLatch latch = new CountDownLatch(1);
+            final MainThread followerAMT = mt[followerA];
+            contexts[followerA].newLeaderAckCallback = new NewLeaderAckCallback() {
+                @Override
+                public void start() {
+                    try {
+                        latch.countDown();
+                        followerAMT.shutdown();
+                    } catch (Exception e) {}
+                }
+            };
+
+            // 7. start follower A to do snapshot sync
+            LOG.info("starting follower {}", followerA);
+            mt[followerA].start();
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+            // 8. now we have invalid data on disk, let's load it and verify
+            LOG.info("disable exit when ack new leader packet on {}", followerA);
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
+            contexts[followerA].exitWhenAckNewLeader = true;
+            contexts[followerA].newLeaderAckCallback = null;
+
+            LOG.info("restarting follower {}", followerA);
+            mt[followerA].start();
+            zk[followerA].close();
+
+            zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+
+            // 9. start follower A, after it's in broadcast state, make sure
+            //    the node value is same as what we have on leader
+            waitForOne(zk[followerA], States.CONNECTED);
+            Assert.assertEquals(
+                new String(zk[followerA].getData(nodePath, null, null)),
+                new String(zk[leaderId].getData(nodePath, null, null))
+            );
+        } finally {
+            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                mt[i].shutdown();
+                zk[i].close();
+            }
+        }
+    }
+
+    static class Context {
+        boolean quitFollowing = false;
+        boolean exitWhenAckNewLeader = false;
+        NewLeaderAckCallback newLeaderAckCallback = null;
+    }
+
+    static interface NewLeaderAckCallback {
+        public void start();
+    }
+
+    static interface StartForwardingListener {
+        public void start();
+    }
+
+    static interface BeginSnapshotListener {
+        public void start();
+    }
+
+    static class CustomizedQPMain extends TestQPMain {
+
+        private Context context;
+
+        public CustomizedQPMain(Context context) {
+            this.context = context;
+        }
+
+        @Override
+        protected QuorumPeer getQuorumPeer() throws SaslException {
+            return new CustomQuorumPeer(context);
+        }
+    }
+
+    static class CustomQuorumPeer extends QuorumPeer {
+        private Context context;
+
+        private StartForwardingListener startForwardingListener;
+        private BeginSnapshotListener beginSnapshotListener;
+
+        public CustomQuorumPeer(Context context)
+                throws SaslException {
+            this.context = context;
+        }
+
+        public void setStartForwardingListener(
+                StartForwardingListener startForwardingListener) {
+            this.startForwardingListener = startForwardingListener;
+        }
+
+        public void setBeginSnapshotListener(
+                BeginSnapshotListener beginSnapshotListener) {
+            this.beginSnapshotListener = beginSnapshotListener;
+        }
+
+        @Override
+        protected Follower makeFollower(FileTxnSnapLog logFactory)
+                throws IOException {
+            return new Follower(this, new FollowerZooKeeperServer(logFactory,
+                    this, this.getZkDb())) {
+
+                @Override
+                void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+                    if (pp != null && pp.getType() == Leader.ACK
+                            && context.exitWhenAckNewLeader) {
+                        if (context.newLeaderAckCallback != null) {
+                            context.newLeaderAckCallback.start();
+                        }
+                    }
+                    super.writePacket(pp, flush);
+                }
+            };
+        }
+
+        @Override
+        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
+            return new Leader(this, new LeaderZooKeeperServer(logFactory,
+                    this, this.getZkDb())) {
+                @Override
+                public long startForwarding(LearnerHandler handler,
+                        long lastSeenZxid) {
+                    if (startForwardingListener != null) {
+                        startForwardingListener.start();
+                    }
+                    return super.startForwarding(handler, lastSeenZxid);
+                }
+
+                @Override
+                public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
+                        int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
+                    return new LearnerSnapshotThrottler(
+                            maxConcurrentSnapshots, maxConcurrentSnapshotTimeout) {
+
+                        @Override
+                        public LearnerSnapshot beginSnapshot(boolean essential)
+                                throws SnapshotThrottleException, InterruptedException {
+                            if (beginSnapshotListener != null) {
+                                beginSnapshotListener.start();
+                            }
+                            return super.beginSnapshot(essential);
+                        }
+                    };
+                }
+            };
+        }
+    }
 }