You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2018/11/06 18:33:02 UTC
zookeeper git commit: ZOOKEEPER-3104: Fix potential data
inconsistency due to NEWLEADER packet being sent too early during SNAP sync
Repository: zookeeper
Updated Branches:
refs/heads/branch-3.5 5701ddf19 -> 427f13678
ZOOKEEPER-3104: Fix potential data inconsistency due to NEWLEADER packet being sent too early during SNAP sync
Port this fix from master to 3.5.
Author: Fangmin Lyu <al...@fb.com>
Reviewers: andor@apache.org
Closes #685 from lvfangmin/ZOOKEEPER-3104-3.5
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/427f1367
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/427f1367
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/427f1367
Branch: refs/heads/branch-3.5
Commit: 427f136783c4a855cc370eb4c3861a3979ea968c
Parents: 5701ddf
Author: Fangmin Lyu <al...@fb.com>
Authored: Tue Nov 6 10:32:58 2018 -0800
Committer: Andor Molnar <an...@apache.org>
Committed: Tue Nov 6 10:32:58 2018 -0800
----------------------------------------------------------------------
.../apache/zookeeper/server/quorum/Leader.java | 11 +-
.../zookeeper/server/quorum/LearnerHandler.java | 37 ++-
.../server/quorum/QuorumPeerMainTest.java | 298 +++++++++++++++++++
3 files changed, 328 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/427f1367/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index a56c0f5..4712580 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -89,8 +89,7 @@ public class Leader {
LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + maxConcurrentSnapshotTimeout);
}
- private final LearnerSnapshotThrottler learnerSnapshotThrottler =
- new LearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
+ private final LearnerSnapshotThrottler learnerSnapshotThrottler;
final LeaderZooKeeperServer zk;
@@ -112,6 +111,12 @@ public class Leader {
return proposalStats;
}
+ public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
+ int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
+ return new LearnerSnapshotThrottler(
+ maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
+ }
+
/**
* Returns a copy of the current learner snapshot
*/
@@ -266,6 +271,8 @@ public class Leader {
throw e;
}
this.zk = zk;
+ this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
+ maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}
/**
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/427f1367/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index f6c68b0..83d7aad 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -188,6 +188,11 @@ public class LearnerHandler extends ZooKeeperThread {
this.leader = leader;
this.bufferedInput = bufferedInput;
+ if (Boolean.getBoolean(FORCE_SNAP_SYNC)) {
+ forceSnapSync = true;
+ LOG.info("Forcing snapshot sync is enabled");
+ }
+
try {
if (leader.self != null) {
leader.self.authServer.authenticate(sock,
@@ -445,22 +450,6 @@ public class LearnerHandler extends ZooKeeperThread {
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
- LOG.debug("Sending NEWLEADER message to " + sid);
- // the version of this quorumVerifier will be set by leader.lead() in case
- // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
- // we got here, so the version was set
- if (getVersion() < 0x10000) {
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- newLeaderZxid, null, null);
- oa.writeRecord(newLeaderQP, "packet");
- } else {
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
- .toString().getBytes(), null);
- queuedPackets.add(newLeaderQP);
- }
- bufferedOutput.flush();
-
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
@@ -488,6 +477,22 @@ public class LearnerHandler extends ZooKeeperThread {
}
}
+ LOG.debug("Sending NEWLEADER message to " + sid);
+ // the version of this quorumVerifier will be set by leader.lead() in case
+ // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
+ // we got here, so the version was set
+ if (getVersion() < 0x10000) {
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ newLeaderZxid, null, null);
+ oa.writeRecord(newLeaderQP, "packet");
+ } else {
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
+ .toString().getBytes(), null);
+ queuedPackets.add(newLeaderQP);
+ }
+ bufferedOutput.flush();
+
// Start thread that blast packets in the queue to learner
startSendingPackets();
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/427f1367/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 5a54bc1..57d3df8 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -36,14 +36,18 @@ import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
+import javax.security.sasl.SaslException;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.WriterAppender;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
@@ -53,6 +57,8 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
@@ -1195,4 +1201,296 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
}
return null;
}
+
+ /**
+ * Currently, in SNAP sync, the leader will start queuing the
+ * proposal/commits and the NEWLEADER packet before sending
+ * over the snapshot over wire. So it's possible that the zxid
+ * associated with the snapshot might be higher than all the
+ * packets queued before NEWLEADER.
+ *
+ * When the follower received the snapshot, it will apply all
+ * the txns queued before NEWLEADER, which may not cover all
+ * the txns up to the zxid in the snapshot. After that, it
+ * will write the snapshot out to disk with the zxid associated
+ * with the snapshot. In case the server crashed after writing
+ * this out, when loading the data from disk, it will use zxid
+ * of the snapshot file to sync with leader, and it could cause
+ * data inconsistent, because we only replayed partial of the
+ * historical data during previous syncing.
+ *
+ * This test case is going to cover and simulate this scenario
+ * and make sure there is no data inconsistency issue after fix.
+ */
+ @Test
+ public void testInconsistentDueToNewLeaderOrder() throws Exception {
+
+ // 1. set up an ensemble with 3 servers
+ final int ENSEMBLE_SERVERS = 3;
+ final int clientPorts[] = new int[ENSEMBLE_SERVERS];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
+ + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+
+ // start servers
+ MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+ ZooKeeper zk[] = new ZooKeeper[ENSEMBLE_SERVERS];
+ Context contexts[] = new Context[ENSEMBLE_SERVERS];
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ final Context context = new Context();
+ contexts[i] = context;
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+ false) {
+ @Override
+ public TestQPMain getTestQPMain() {
+ return new CustomizedQPMain(context);
+ }
+ };
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
+ }
+ waitForAll(zk, States.CONNECTED);
+ LOG.info("all servers started");
+
+ String nodePath = "/testInconsistentDueToNewLeader";
+
+ int leaderId = -1;
+ int followerA = -1;
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ if (mt[i].main.quorumPeer.leader != null) {
+ leaderId = i;
+ } else if (followerA == -1) {
+ followerA = i;
+ }
+ }
+ LOG.info("shutdown follower {}", followerA);
+ mt[followerA].shutdown();
+ waitForOne(zk[followerA], States.CONNECTING);
+
+ try {
+ // 2. set force snapshot to be true
+ LOG.info("force snapshot sync");
+ System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+ // 3. create a node
+ String initialValue = "1";
+ final ZooKeeper leaderZk = zk[leaderId];
+ leaderZk.create(nodePath, initialValue.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ LOG.info("created node {} with value {}", nodePath, initialValue);
+
+ CustomQuorumPeer leaderQuorumPeer =
+ (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
+
+ // 4. on the customized leader catch the startForwarding call
+ // (without synchronized), set the node to value v1, then
+ // call the super.startForwarding to generate the ongoing
+ // txn proposal and commit for v1 value update
+ leaderQuorumPeer.setStartForwardingListener(
+ new StartForwardingListener() {
+ @Override
+ public void start() {
+ if (!Boolean.getBoolean(LearnerHandler.FORCE_SNAP_SYNC)) {
+ return;
+ }
+ final String value = "2";
+ LOG.info("start forwarding, set {} to {}", nodePath, value);
+ // use async, otherwise it will block the logLock in
+ // ZKDatabase and the setData request will timeout
+ try {
+ leaderZk.setData(nodePath, value.getBytes(), -1,
+ new AsyncCallback.StatCallback() {
+ public void processResult(int rc, String path,
+ Object ctx, Stat stat) {}
+ }, null);
+ // wait for the setData txn being populated
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ LOG.error("error when set {} to {}", nodePath, value, e);
+ }
+ }
+ });
+
+ // 5. on the customized leader catch the beginSnapshot call in
+ // LearnerSnapshotThrottler to set the node to value v2,
+ // wait it hit data tree
+ leaderQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener() {
+ @Override
+ public void start() {
+ String value = "3";
+ LOG.info("before sending snapshot, set {} to {}",
+ nodePath, value);
+ try {
+ leaderZk.setData(nodePath, value.getBytes(), -1);
+ LOG.info("successfully set {} to {}", nodePath, value);
+ } catch (Exception e) {
+ LOG.error("error when set {} to {}, {}", nodePath, value, e);
+ }
+ }
+ });
+
+ // 6. exit follower A after taking snapshot
+ CustomQuorumPeer followerAQuorumPeer =
+ ((CustomQuorumPeer) mt[followerA].main.quorumPeer);
+ LOG.info("set exit when ack new leader packet on {}", followerA);
+ contexts[followerA].exitWhenAckNewLeader = true;
+ CountDownLatch latch = new CountDownLatch(1);
+ final MainThread followerAMT = mt[followerA];
+ contexts[followerA].newLeaderAckCallback = new NewLeaderAckCallback() {
+ @Override
+ public void start() {
+ try {
+ latch.countDown();
+ followerAMT.shutdown();
+ } catch (Exception e) {}
+ }
+ };
+
+ // 7. start follower A to do snapshot sync
+ LOG.info("starting follower {}", followerA);
+ mt[followerA].start();
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+ // 8. now we have invalid data on disk, let's load it and verify
+ LOG.info("disable exit when ack new leader packet on {}", followerA);
+ System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
+ contexts[followerA].exitWhenAckNewLeader = true;
+ contexts[followerA].newLeaderAckCallback = null;
+
+ LOG.info("restarting follower {}", followerA);
+ mt[followerA].start();
+ zk[followerA].close();
+
+ zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA],
+ ClientBase.CONNECTION_TIMEOUT, this);
+
+ // 9. start follower A, after it's in broadcast state, make sure
+ // the node value is same as what we have on leader
+ waitForOne(zk[followerA], States.CONNECTED);
+ Assert.assertEquals(
+ new String(zk[followerA].getData(nodePath, null, null)),
+ new String(zk[leaderId].getData(nodePath, null, null))
+ );
+ } finally {
+ System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ mt[i].shutdown();
+ zk[i].close();
+ }
+ }
+ }
+
+ static class Context {
+ boolean quitFollowing = false;
+ boolean exitWhenAckNewLeader = false;
+ NewLeaderAckCallback newLeaderAckCallback = null;
+ }
+
+ static interface NewLeaderAckCallback {
+ public void start();
+ }
+
+ static interface StartForwardingListener {
+ public void start();
+ }
+
+ static interface BeginSnapshotListener {
+ public void start();
+ }
+
+ static class CustomizedQPMain extends TestQPMain {
+
+ private Context context;
+
+ public CustomizedQPMain(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ protected QuorumPeer getQuorumPeer() throws SaslException {
+ return new CustomQuorumPeer(context);
+ }
+ }
+
+ static class CustomQuorumPeer extends QuorumPeer {
+ private Context context;
+
+ private StartForwardingListener startForwardingListener;
+ private BeginSnapshotListener beginSnapshotListener;
+
+ public CustomQuorumPeer(Context context)
+ throws SaslException {
+ this.context = context;
+ }
+
+ public void setStartForwardingListener(
+ StartForwardingListener startForwardingListener) {
+ this.startForwardingListener = startForwardingListener;
+ }
+
+ public void setBeginSnapshotListener(
+ BeginSnapshotListener beginSnapshotListener) {
+ this.beginSnapshotListener = beginSnapshotListener;
+ }
+
+ @Override
+ protected Follower makeFollower(FileTxnSnapLog logFactory)
+ throws IOException {
+ return new Follower(this, new FollowerZooKeeperServer(logFactory,
+ this, this.getZkDb())) {
+
+ @Override
+ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+ if (pp != null && pp.getType() == Leader.ACK
+ && context.exitWhenAckNewLeader) {
+ if (context.newLeaderAckCallback != null) {
+ context.newLeaderAckCallback.start();
+ }
+ }
+ super.writePacket(pp, flush);
+ }
+ };
+ }
+
+ @Override
+ protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
+ return new Leader(this, new LeaderZooKeeperServer(logFactory,
+ this, this.getZkDb())) {
+ @Override
+ public long startForwarding(LearnerHandler handler,
+ long lastSeenZxid) {
+ if (startForwardingListener != null) {
+ startForwardingListener.start();
+ }
+ return super.startForwarding(handler, lastSeenZxid);
+ }
+
+ @Override
+ public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
+ int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
+ return new LearnerSnapshotThrottler(
+ maxConcurrentSnapshots, maxConcurrentSnapshotTimeout) {
+
+ @Override
+ public LearnerSnapshot beginSnapshot(boolean essential)
+ throws SnapshotThrottleException, InterruptedException {
+ if (beginSnapshotListener != null) {
+ beginSnapshotListener.start();
+ }
+ return super.beginSnapshot(essential);
+ }
+ };
+ }
+ };
+ }
+ }
}