You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by sy...@apache.org on 2022/05/17 08:05:49 UTC

[zookeeper] 02/02: ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.

This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git

commit 161e50574baf6725dec00c547a5005f19588587d
Author: Mukti Krishnan <mu...@gmail.com>
AuthorDate: Tue Mar 9 21:46:52 2021 +0530

    ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.
    
    Based on https://github.com/apache/zookeeper/pull/1224 ; fixed unit test build issue.
    
    Author: Fangmin Lyu <fa...@apache.org>
    Author: Michael Han <ha...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Originally developed by Fangmin Lyu <fa...@apache.org>
    
    Closes #1515 from hanm/ZOOKEEPER-3642
    
    (cherry picked from commit a53cfeb26e1e1b9b6b1e29fe7bd9f0277b8fff9a)
---
 .../apache/zookeeper/server/ZooKeeperServer.java   |   3 +
 .../apache/zookeeper/server/quorum/Learner.java    |   4 +-
 .../server/quorum/QuorumPeerMainTest.java          | 147 ++++++++++++++++++++-
 3 files changed, 150 insertions(+), 4 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index a12c1117d..130861354 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -612,6 +612,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      */
     public synchronized void shutdown(boolean fullyShutDown) {
         if (!canShutdown()) {
+            if (fullyShutDown && zkDb != null) {
+                zkDb.clear();
+            }
             LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
             return;
         }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 74f9f4ff1..7b22c060e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -687,7 +687,9 @@ public class Learner {
         closeSocket();
         // shutdown previous zookeeper
         if (zk != null) {
-            zk.shutdown();
+            // If we haven't finished SNAP sync, force fully shutdown
+            // to avoid potential inconsistency
+            zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
         }
     }
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index ff42204db..a1eeaa3ac 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
 
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 import static org.apache.zookeeper.test.ClientBase.createEmptyTestDir;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
@@ -1412,21 +1413,153 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         }
     }
 
+    /**
+     * If learner failed to do SNAP sync with leader before it's writing
+     * the snapshot to disk, it's possible that it might have DIFF sync
+     * with new leader or itself being elected as a leader.
+     *
+     * This test is trying to guarantee there is no data inconsistency for
+     * this case.
+     */
+    @Test
+    public void testDiffSyncAfterSnap() throws Exception {
+        final int ENSEMBLE_SERVERS = 3;
+        MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+        ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];
+
+        try {
+            // 1. start a quorum
+            final int[] clientPorts = new int[ENSEMBLE_SERVERS];
+            StringBuilder sb = new StringBuilder();
+            String server;
+
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                clientPorts[i] = PortAssignment.unique();
+                server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                        + ":" + PortAssignment.unique()
+                        + ":participant;127.0.0.1:" + clientPorts[i];
+                sb.append(server + "\n");
+            }
+            String currentQuorumCfgSection = sb.toString();
+
+            // start servers
+            Context[] contexts = new Context[ENSEMBLE_SERVERS];
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                final Context context = new Context();
+                contexts[i] = context;
+                mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
+                    @Override
+                    public TestQPMain getTestQPMain() {
+                        return new CustomizedQPMain(context);
+                    }
+                };
+                mt[i].start();
+                zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            }
+            waitForAll(zk, States.CONNECTED);
+            LOG.info("all servers started");
+
+            final String nodePath = "/testDiffSyncAfterSnap";
+
+            // 2. find leader and a follower
+            int leaderId = -1;
+            int followerA = -1;
+            for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    leaderId = i;
+                } else if (followerA == -1) {
+                    followerA = i;
+                }
+            }
+
+            // 3. stop follower A
+            LOG.info("shutdown follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+
+            // 4. issue some traffic
+            int index = 0;
+            int numOfRequests = 10;
+            for (int i = 0; i < numOfRequests; i++) {
+                zk[leaderId].create(nodePath + index++,
+                        new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
+
+            // 5. inject fault to cause the follower exit when received NEWLEADER
+            contexts[followerA].newLeaderReceivedCallback = new NewLeaderReceivedCallback() {
+                boolean processed = false;
+                @Override
+                public void process() throws IOException {
+                    if (processed) {
+                        return;
+                    }
+                    processed = true;
+                    System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
+                    throw new IOException("read timedout");
+                }
+            };
+
+            // 6. force snap sync once
+            LOG.info("force snapshot sync");
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+            // 7. start follower A
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+            LOG.info("verify the nodes are exist in memory");
+            for (int i = 0; i < index; i++) {
+                assertNotNull(zk[followerA].exists(nodePath + i, false));
+            }
+
+            // 8. issue another request which will be persisted on disk
+            zk[leaderId].create(nodePath + index++,
+                    new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            // wait some time to let this get written to disk
+            Thread.sleep(500);
+
+            // 9. reload data from disk and make sure it's still consistent
+            LOG.info("restarting follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+
+            for (int i = 0; i < index; i++) {
+                assertNotNull( "node " + i + " should exist", zk[followerA].exists(nodePath + i, false));
+            }
+
+        } finally {
+            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                mt[i].shutdown();
+                zk[i].close();
+            }
+        }
+    }
+
     static class Context {
         boolean quitFollowing = false;
         boolean exitWhenAckNewLeader = false;
         NewLeaderAckCallback newLeaderAckCallback = null;
+        NewLeaderReceivedCallback newLeaderReceivedCallback = null;
     }
 
-    static interface NewLeaderAckCallback {
+    interface NewLeaderAckCallback {
         public void start();
     }
 
-    static interface StartForwardingListener {
+    interface NewLeaderReceivedCallback {
+        void process() throws IOException;
+    }
+
+    interface StartForwardingListener {
         public void start();
     }
 
-    static interface BeginSnapshotListener {
+    interface BeginSnapshotListener {
         public void start();
     }
 
@@ -1481,6 +1614,14 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
                     }
                     super.writePacket(pp, flush);
                 }
+
+                @Override
+                void readPacket(QuorumPacket qp) throws IOException {
+                    super.readPacket(qp);
+                    if (qp.getType() == Leader.NEWLEADER && context.newLeaderReceivedCallback != null) {
+                        context.newLeaderReceivedCallback.process();
+                    }
+                }
             };
         }