You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by sy...@apache.org on 2022/05/17 08:05:47 UTC

[zookeeper] branch branch-3.5 updated (d1ec2f346 -> 161e50574)

This is an automated email from the ASF dual-hosted git repository.

symat pushed a change to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


    from d1ec2f346 ZOOKEEPER-3781: Create snapshots on followers when snapshot.trust.empty is true
     new f68019ab9 ZOOKEEPER-3459: Add admin command to display synced state of peer
     new 161e50574 ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/zookeeper/server/ZooKeeperServer.java   |   3 +
 .../apache/zookeeper/server/admin/Commands.java    |  40 +++++-
 .../apache/zookeeper/server/quorum/Follower.java   |  15 ++-
 .../org/apache/zookeeper/server/quorum/Leader.java |   3 +
 .../apache/zookeeper/server/quorum/Learner.java    |  14 +-
 .../apache/zookeeper/server/quorum/Observer.java   |   5 +-
 .../apache/zookeeper/server/quorum/QuorumPeer.java |  50 +++++++
 .../server/quorum/QuorumPeerMainTest.java          | 147 ++++++++++++++++++++-
 8 files changed, 260 insertions(+), 17 deletions(-)


[zookeeper] 01/02: ZOOKEEPER-3459: Add admin command to display synced state of peer

Posted by sy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git

commit f68019ab9c2e0ee362b890e1a7a0a449afda3a7c
Author: Brian Nixon <ni...@fb.com>
AuthorDate: Mon Jul 15 14:15:03 2019 +0200

    ZOOKEEPER-3459: Add admin command to display synced state of peer
    
    Author: Brian Nixon <ni...@fb.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@apache.org>
    
    Closes #1012 from enixon/cmd-sync-state
    
    (cherry picked from commit cc900a3b05bc31a237753680c8b00dc5866df4b2)
---
 .../apache/zookeeper/server/admin/Commands.java    | 40 +++++++++++++++--
 .../apache/zookeeper/server/quorum/Follower.java   | 15 ++++---
 .../org/apache/zookeeper/server/quorum/Leader.java |  3 ++
 .../apache/zookeeper/server/quorum/Learner.java    | 10 +++--
 .../apache/zookeeper/server/quorum/Observer.java   |  5 ++-
 .../apache/zookeeper/server/quorum/QuorumPeer.java | 50 ++++++++++++++++++++++
 6 files changed, 110 insertions(+), 13 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 7f338091c..fc10f9da8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -37,9 +37,8 @@ import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.persistence.SnapshotInfo;
-import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
-import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.server.quorum.*;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,6 +129,7 @@ public class Commands {
         registerCommand(new WatchCommand());
         registerCommand(new WatchesByPathCommand());
         registerCommand(new WatchSummaryCommand());
+        registerCommand(new ZabStateCommand());
     }
 
     /**
@@ -576,5 +576,39 @@ public class Commands {
         }
     }
 
+
+    /**
+     * Returns the current phase of Zab protocol that peer is running.
+     * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
+     */
+    public static class ZabStateCommand extends CommandBase {
+        public ZabStateCommand() {
+            super(Arrays.asList("zabstate"));
+        }
+
+        @Override
+        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+            CommandResponse response = initializeResponse();
+            if (zkServer instanceof QuorumZooKeeperServer) {
+                QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
+                QuorumPeer.ZabState zabState = peer.getZabState();
+                QuorumVerifier qv = peer.getQuorumVerifier();
+
+                QuorumPeer.QuorumServer voter = qv.getVotingMembers().get(peer.getId());
+                boolean voting = (
+                        voter != null
+                                && voter.addr.equals(peer.getQuorumAddress())
+                                && voter.electionAddr.equals(peer.getElectionAddress())
+                );
+                response.put("voting", voting);
+                response.put("zabstate", zabState.name().toLowerCase());
+            } else {
+                response.put("voting", false);
+                response.put("zabstate", "");
+            }
+            return response ;
+        }
+    }
+
     private Commands() {}
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index b79f5702c..80b41ef5c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -40,7 +40,7 @@ public class Follower extends Learner{
     private long lastQueued;
     // This is the same object as this.zk, but we cache the downcast op
     final FollowerZooKeeperServer fzk;
-    
+
     Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
         this.self = self;
         this.zk=zk;
@@ -72,7 +72,8 @@ public class Follower extends Learner{
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
         try {
-            QuorumServer leaderServer = findLeader();            
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
+            QuorumServer leaderServer = findLeader();
             try {
                 connectToLeader(leaderServer.addr, leaderServer.hostname);
                 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
@@ -86,7 +87,9 @@ public class Follower extends Learner{
                             + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                     throw new IOException("Error: Epoch of leader is lower");
                 }
-                syncWithLeader(newEpochZxid);                
+                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
+                syncWithLeader(newEpochZxid);
+                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning()) {
                     readPacket(qp);
@@ -114,7 +117,7 @@ public class Follower extends Learner{
         case Leader.PING:            
             ping(qp);            
             break;
-        case Leader.PROPOSAL:           
+        case Leader.PROPOSAL:
             TxnHeader hdr = new TxnHeader();
             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
             if (hdr.getZxid() != lastQueued + 1) {
@@ -146,9 +149,9 @@ public class Follower extends Learner{
            // get new designated leader from (current) leader's message
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
            long suggestedLeaderId = buffer.getLong();
-            boolean majorChange = 
+            boolean majorChange =
                    self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
-           // commit (writes the new config to ZK tree (/zookeeper/config)                     
+           // commit (writes the new config to ZK tree (/zookeeper/config)
            fzk.commit(qp.getZxid());
             if (majorChange) {
                throw new Exception("changes proposed in reconfig");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index b3c4b6c5d..11039bfd9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -469,6 +469,7 @@ public class Leader {
         zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
 
         try {
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             self.tick.set(0);
             zk.loadData();
 
@@ -539,6 +540,7 @@ public class Leader {
 
              waitForEpochAck(self.getId(), leaderStateSummary);
              self.setCurrentEpoch(epoch);
+             self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
 
              try {
                  waitForNewLeaderAck(self.getId(), zk.getZxid());
@@ -590,6 +592,7 @@ public class Leader {
                 self.setZooKeeperServer(zk);
             }
 
+            self.setZabState(QuorumPeer.ZabState.BROADCAST);
             self.adminServer.setZooKeeperServer(zk);
 
             // Everything is a go, simply start counting the ticks
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index a8d89b28d..74f9f4ff1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -235,7 +235,7 @@ public class Learner {
 
     /**
      * Establish a connection with the Leader found by findLeader. Retries
-     * until either initLimit time has elapsed or 5 tries have happened. 
+     * until either initLimit time has elapsed or 5 tries have happened.
      * @param addr - the address of the Leader to connect to.
      * @throws IOException - if the socket connection fails on the 5th attempt
      * <li>if there is an authentication failure while connecting to leader</li>
@@ -309,7 +309,7 @@ public class Learner {
 
     /**
      * Once connected to the leader, perform the handshake protocol to
-     * establish a following / observing connection. 
+     * establish a following / observing connection.
      * @param pktType
      * @return the zxid the Leader sends for synchronization purposes.
      * @throws IOException
@@ -368,7 +368,7 @@ public class Learner {
     } 
     
     /**
-     * Finally, synchronize our history with the Leader. 
+     * Finally, synchronize our history with the Leader.
      * @param newLeaderZxid
      * @throws IOException
      * @throws InterruptedException
@@ -390,6 +390,7 @@ public class Learner {
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
                 LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
+                self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                 if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
                     LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
                     snapshotNeeded = true;
@@ -399,6 +400,7 @@ public class Learner {
                 }
             }
             else if (qp.getType() == Leader.SNAP) {
+                self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                 LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                 // The leader is going to dump the database
                 // db is clear as part of deserializeSnapshot()
@@ -421,6 +423,7 @@ public class Learner {
                 syncSnapshot = true;
             } else if (qp.getType() == Leader.TRUNC) {
                 //we need to truncate the log to the lastzxid of the leader
+                self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                 LOG.warn("Truncating log to get in sync with the leader 0x"
                         + Long.toHexString(qp.getZxid()));
                 boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
@@ -591,6 +594,7 @@ public class Learner {
         }
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
+        self.setSyncMode(QuorumPeer.SyncMode.NONE);
         zk.startServing();
         /*
          * Update the election vote here to ensure that all members of the
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index 050582d62..6e1d9c150 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -63,6 +63,7 @@ public class Observer extends Learner{
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
         try {
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer leaderServer = findLeader();
             LOG.info("Observing " + leaderServer.addr);
             try {
@@ -70,8 +71,10 @@ public class Observer extends Learner{
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                 if (self.isReconfigStateChange())
                    throw new Exception("learned about role change");
- 
+
+                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                 syncWithLeader(newLeaderZxid);
+                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning()) {
                     readPacket(qp);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 474f70c44..daf605cab 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -400,6 +400,22 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         LOOKING, FOLLOWING, LEADING, OBSERVING;
     }
 
+    /**
+     * (Used for monitoring) shows the current phase of
+     * Zab protocol that peer is running.
+     */
+    public enum ZabState {
+        ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST;
+    }
+
+    /**
+     * (Used for monitoring) When peer is in synchronization phase, this shows
+     * which synchronization mechanism is being used
+     */
+    public enum SyncMode {
+        NONE, DIFF, SNAP, TRUNC;
+    }
+
     /*
      * A peer can either be participating, which implies that it is willing to
      * both vote in instances of consensus and to elect or become a Leader, or
@@ -715,11 +731,45 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     private ServerState state = ServerState.LOOKING;
 
+    private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
+    private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
+
     private boolean reconfigFlag = false; // indicates that a reconfig just committed
 
     public synchronized void setPeerState(ServerState newState){
         state=newState;
     }
+
+    public void setZabState(ZabState zabState) {
+        this.zabState.set(zabState);
+        LOG.info("Peer state changed: {}", getDetailedPeerState());
+    }
+
+    public void setSyncMode(SyncMode syncMode) {
+        this.syncMode.set(syncMode);
+        LOG.info("Peer state changed: {}", getDetailedPeerState());
+    }
+
+    public ZabState getZabState() {
+        return zabState.get();
+    }
+
+    public SyncMode getSyncMode() {
+        return syncMode.get();
+    }
+    public String getDetailedPeerState() {
+        final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
+        final ZabState zabState = getZabState();
+        if (!ZabState.ELECTION.equals(zabState)) {
+            sb.append(" - ").append(zabState.toString().toLowerCase());
+        }
+        final SyncMode syncMode = getSyncMode();
+        if (!SyncMode.NONE.equals(syncMode)) {
+            sb.append(" - ").append(syncMode.toString().toLowerCase());
+        }
+        return sb.toString();
+    }
+
     public synchronized void reconfigFlagSet(){
        reconfigFlag = true;
     }


[zookeeper] 02/02: ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.

Posted by sy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git

commit 161e50574baf6725dec00c547a5005f19588587d
Author: Mukti Krishnan <mu...@gmail.com>
AuthorDate: Tue Mar 9 21:46:52 2021 +0530

    ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.
    
    Based on https://github.com/apache/zookeeper/pull/1224 ; fixed unit test build issue.
    
    Author: Fangmin Lyu <fa...@apache.org>
    Author: Michael Han <ha...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Originally developed by Fangmin Lyu <fa...@apache.org>
    
    Closes #1515 from hanm/ZOOKEEPER-3642
    
    (cherry picked from commit a53cfeb26e1e1b9b6b1e29fe7bd9f0277b8fff9a)
---
 .../apache/zookeeper/server/ZooKeeperServer.java   |   3 +
 .../apache/zookeeper/server/quorum/Learner.java    |   4 +-
 .../server/quorum/QuorumPeerMainTest.java          | 147 ++++++++++++++++++++-
 3 files changed, 150 insertions(+), 4 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index a12c1117d..130861354 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -612,6 +612,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      */
     public synchronized void shutdown(boolean fullyShutDown) {
         if (!canShutdown()) {
+            if (fullyShutDown && zkDb != null) {
+                zkDb.clear();
+            }
             LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
             return;
         }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 74f9f4ff1..7b22c060e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -687,7 +687,9 @@ public class Learner {
         closeSocket();
         // shutdown previous zookeeper
         if (zk != null) {
-            zk.shutdown();
+            // If we haven't finished SNAP sync, force fully shutdown
+            // to avoid potential inconsistency
+            zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
         }
     }
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index ff42204db..a1eeaa3ac 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
 
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 import static org.apache.zookeeper.test.ClientBase.createEmptyTestDir;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
@@ -1412,21 +1413,153 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         }
     }
 
+    /**
+     * If learner failed to do SNAP sync with leader before it's writing
+     * the snapshot to disk, it's possible that it might have DIFF sync
+     * with new leader or itself being elected as a leader.
+     *
+     * This test is trying to guarantee there is no data inconsistency for
+     * this case.
+     */
+    @Test
+    public void testDiffSyncAfterSnap() throws Exception {
+        final int ENSEMBLE_SERVERS = 3;
+        MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+        ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];
+
+        try {
+            // 1. start a quorum
+            final int[] clientPorts = new int[ENSEMBLE_SERVERS];
+            StringBuilder sb = new StringBuilder();
+            String server;
+
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                clientPorts[i] = PortAssignment.unique();
+                server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                        + ":" + PortAssignment.unique()
+                        + ":participant;127.0.0.1:" + clientPorts[i];
+                sb.append(server + "\n");
+            }
+            String currentQuorumCfgSection = sb.toString();
+
+            // start servers
+            Context[] contexts = new Context[ENSEMBLE_SERVERS];
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                final Context context = new Context();
+                contexts[i] = context;
+                mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
+                    @Override
+                    public TestQPMain getTestQPMain() {
+                        return new CustomizedQPMain(context);
+                    }
+                };
+                mt[i].start();
+                zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            }
+            waitForAll(zk, States.CONNECTED);
+            LOG.info("all servers started");
+
+            final String nodePath = "/testDiffSyncAfterSnap";
+
+            // 2. find leader and a follower
+            int leaderId = -1;
+            int followerA = -1;
+            for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    leaderId = i;
+                } else if (followerA == -1) {
+                    followerA = i;
+                }
+            }
+
+            // 3. stop follower A
+            LOG.info("shutdown follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+
+            // 4. issue some traffic
+            int index = 0;
+            int numOfRequests = 10;
+            for (int i = 0; i < numOfRequests; i++) {
+                zk[leaderId].create(nodePath + index++,
+                        new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
+
+            // 5. inject fault to cause the follower exit when received NEWLEADER
+            contexts[followerA].newLeaderReceivedCallback = new NewLeaderReceivedCallback() {
+                boolean processed = false;
+                @Override
+                public void process() throws IOException {
+                    if (processed) {
+                        return;
+                    }
+                    processed = true;
+                    System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
+                    throw new IOException("read timedout");
+                }
+            };
+
+            // 6. force snap sync once
+            LOG.info("force snapshot sync");
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+            // 7. start follower A
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+            LOG.info("verify the nodes are exist in memory");
+            for (int i = 0; i < index; i++) {
+                assertNotNull(zk[followerA].exists(nodePath + i, false));
+            }
+
+            // 8. issue another request which will be persisted on disk
+            zk[leaderId].create(nodePath + index++,
+                    new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            // wait some time to let this get written to disk
+            Thread.sleep(500);
+
+            // 9. reload data from disk and make sure it's still consistent
+            LOG.info("restarting follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+
+            for (int i = 0; i < index; i++) {
+                assertNotNull( "node " + i + " should exist", zk[followerA].exists(nodePath + i, false));
+            }
+
+        } finally {
+            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                mt[i].shutdown();
+                zk[i].close();
+            }
+        }
+    }
+
     static class Context {
         boolean quitFollowing = false;
         boolean exitWhenAckNewLeader = false;
         NewLeaderAckCallback newLeaderAckCallback = null;
+        NewLeaderReceivedCallback newLeaderReceivedCallback = null;
     }
 
-    static interface NewLeaderAckCallback {
+    interface NewLeaderAckCallback {
         public void start();
     }
 
-    static interface StartForwardingListener {
+    interface NewLeaderReceivedCallback {
+        void process() throws IOException;
+    }
+
+    interface StartForwardingListener {
         public void start();
     }
 
-    static interface BeginSnapshotListener {
+    interface BeginSnapshotListener {
         public void start();
     }
 
@@ -1481,6 +1614,14 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
                     }
                     super.writePacket(pp, flush);
                 }
+
+                @Override
+                void readPacket(QuorumPacket qp) throws IOException {
+                    super.readPacket(qp);
+                    if (qp.getType() == Leader.NEWLEADER && context.newLeaderReceivedCallback != null) {
+                        context.newLeaderReceivedCallback.process();
+                    }
+                }
             };
         }