You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/06/28 23:22:36 UTC

svn commit: r1497929 [1/2] - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/persistence/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ s...

Author: thawan
Date: Fri Jun 28 21:22:35 2013
New Revision: 1497929

URL: http://svn.apache.org/r1497929
Log:
ZOOKEEPER-1413. Use on-disk transaction log for learner sync up (thawan)

Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GetProposalFromTxnTest.java   (with props)
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/ivy.xml
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LoadFromLogTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jun 28 21:22:35 2013
@@ -536,7 +536,8 @@ IMPROVEMENTS:
   
   ZOOKEEPER-1719. zkCli.sh, zkServer.sh and zkEnv.sh regression caused by ZOOKEEPER-1663
   (Marshall McMullen via camille)
-  
+
+  ZOOKEEPER-1413. Use on-disk transaction log for learner sync up (thawan)
 
 Release 3.4.0 - 
 

Modified: zookeeper/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/zookeeper/trunk/ivy.xml?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/ivy.xml (original)
+++ zookeeper/trunk/ivy.xml Fri Jun 28 21:22:35 2013
@@ -39,8 +39,8 @@
   </publications>
 
   <dependencies>
-    <dependency org="org.slf4j" name="slf4j-api" rev="1.6.2"/>
-    <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.2" transitive="false"/>
+    <dependency org="org.slf4j" name="slf4j-api" rev="1.7.5"/>
+    <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.5" transitive="false"/>
     <dependency org="commons-cli" name="commons-cli" rev="1.2" />
   
     <!-- transitive false turns off dependency checking, log4j deps seem borked -->

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java?rev=1497929&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java Fri Jun 28 21:22:35 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides an iterator interface to access Proposal deserialized
+ * from on-disk txnlog. The iterator deserializes one proposal at a time
+ * to reduce memory footprint. Note that the request part of the proposal
+ * is not initialized and set to null since we don't need it during
+ * follower sync-up.
+ *
+ */
+public class TxnLogProposalIterator implements Iterator<Proposal> {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(TxnLogProposalIterator.class);
+
+    public static final TxnLogProposalIterator EMPTY_ITERATOR = new TxnLogProposalIterator();
+
+    private boolean hasNext = false;
+
+    private TxnIterator itr;
+
+    @Override
+    public boolean hasNext() {
+        return hasNext;
+    }
+
+    /**
+     * Proposal returned by this iterator has request part set to null, since
+     * it is not used for follower sync-up.
+     */
+    @Override
+    public Proposal next() {
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        Proposal p = new Proposal();
+        try {
+            TxnHeader hdr = itr.getHeader();
+            Record txn = itr.getTxn();
+            hdr.serialize(boa, "hdr");
+            if (txn != null) {
+                txn.serialize(boa, "txn");
+            }
+            baos.close();
+
+            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader()
+                    .getZxid(), baos.toByteArray(), null);
+            p.packet = pp;
+            p.request = null;
+
+            // This is the only place that can throw IO exception
+            hasNext = itr.next();
+
+        } catch (IOException e) {
+            LOG.error("Unable to read txnlog from disk", e);
+            hasNext = false;
+        }
+
+        return p;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    private TxnLogProposalIterator() {
+    }
+
+    public TxnLogProposalIterator(TxnIterator itr) {
+        if (itr != null) {
+            this.itr = itr;
+            hasNext = (itr.getHeader() != null);
+        }
+    }
+
+}

Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Fri Jun 28 21:22:35 2013
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -34,8 +35,6 @@ import org.apache.jute.BinaryOutputArchi
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.Watcher;
@@ -45,12 +44,15 @@ import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class maintains the in memory database of zookeeper
@@ -70,6 +72,13 @@ public class ZKDatabase {
     protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
     protected FileTxnSnapLog snapLog;
     protected long minCommittedLog, maxCommittedLog;
+    
+    /**
+     * Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot
+     */
+    public static final String SNAPSHOT_SIZE_FACTOR = "zookeeper.snapshotSizeFactor";
+    private double snapshotSizeFactor = 0.33;
+    
     public static final int commitLogCount = 500;
     protected static int commitLogBuffer = 700;
     protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
@@ -256,9 +265,66 @@ public class ZKDatabase {
             wl.unlock();
         }
     }
+    
+    public double getSnapshotSizeFactor() {
+        return snapshotSizeFactor;
+    }
 
+    public long calculateTxnLogSizeLimit() {
+        long snapSize = 0;
+        try {
+            snapSize = snapLog.findMostRecentSnapshot().length();
+        } catch (IOException e) {
+            LOG.error("Unable to get size of most recent snapshot");
+        }
+        return (long) (snapSize * snapshotSizeFactor);
+    }
 
     /**
+     * Get proposals from txnlog. Only packet part of proposal is populated.
+     *
+     * @param startZxid the starting zxid of the proposal
+     * @param sizeLimit maximum on-disk size of txnlog to fetch
+     *                  0 is unlimited, negative value means disable.
+     * @return list of proposal (request part of each proposal is null)
+     */
+    public Iterator<Proposal> getProposalsFromTxnLog(long startZxid,
+                                                     long sizeLimit) {
+        if (sizeLimit < 0) {
+            LOG.debug("Negative size limit - retrieving proposal via txnlog is disabled");
+            return TxnLogProposalIterator.EMPTY_ITERATOR;
+        }
+
+        TxnIterator itr = null;
+        try {
+
+            itr = snapLog.readTxnLog(startZxid, false);
+
+            // If we cannot guarantee that this is strictly the starting txn
+            // after a given zxid, we should fail.
+            if ((itr.getHeader() != null)
+                    && (itr.getHeader().getZxid() > startZxid)) {
+                LOG.warn("Unable to find proposals from txnlog for zxid: "
+                        + startZxid);
+                return TxnLogProposalIterator.EMPTY_ITERATOR;
+            }
+
+            if (sizeLimit > 0) {
+                long txnSize = itr.getStorageSize();
+                if (txnSize > sizeLimit) {
+                    LOG.info("Txnlog size: " + txnSize + " exceeds sizeLimit: "
+                            + sizeLimit);
+                    return TxnLogProposalIterator.EMPTY_ITERATOR;
+                }
+            }
+        } catch (IOException e) {
+            LOG.error("Unable to read txnlog from disk", e);
+            return TxnLogProposalIterator.EMPTY_ITERATOR;
+        }
+        return new TxnLogProposalIterator(itr);
+    }
+    
+    /**
      * remove a cnxn from the datatree
      * @param cnxn the cnxn to remove from the datatree
      */
@@ -503,4 +569,11 @@ public class ZKDatabase {
         }
     }
  
+    /**
+     * Use for unit testing, so we can turn this feature on/off
+     * @param snapshotSizeFactor Set to minus value to turn this off.
+     */
+    public void setSnapshotSizeFactor(double snapshotSizeFactor) {
+        this.snapshotSizeFactor = snapshotSizeFactor;
+    }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Fri Jun 28 21:22:35 2013
@@ -40,6 +40,7 @@ import org.apache.jute.BinaryOutputArchi
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
@@ -338,7 +339,20 @@ public class FileTxnLog implements TxnLo
      * logs
      */
     public TxnIterator read(long zxid) throws IOException {
-        return new FileTxnIterator(logDir, zxid);
+        return read(zxid, true);
+    }
+
+    /**
+     * start reading all the transactions from the given zxid.
+     *
+     * @param zxid the zxid to start reading transactions from
+     * @param fastForward true if the iterator should be fast forwarded to point
+     *        to the txn of a given zxid, else the iterator will point to the
+     *        starting txn of a txnlog that may contain txn of a given zxid
+     * @return returns an iterator to iterate through the transaction logs
+     */
+    public TxnIterator read(long zxid, boolean fastForward) throws IOException {
+        return new FileTxnIterator(logDir, zxid, fastForward);
     }
 
     /**
@@ -496,12 +510,34 @@ public class FileTxnLog implements TxnLo
          * create an iterator over a transaction database directory
          * @param logDir the transaction database directory
          * @param zxid the zxid to start reading from
+         * @param fastForward   true if the iterator should be fast forwarded to
+         *        point to the txn of a given zxid, else the iterator will
+         *        point to the starting txn of a txnlog that may contain txn of
+         *        a given zxid
+         * @throws IOException
+         */
+        public FileTxnIterator(File logDir, long zxid, boolean fastForward)
+                throws IOException {
+            this.logDir = logDir;
+            this.zxid = zxid;
+            init();
+
+            if (fastForward && hdr != null) {
+                while (hdr.getZxid() < zxid) {
+                    if (!next())
+                        break;
+                }
+            }
+        }
+        
+        /**
+         * create an iterator over a transaction database directory
+         * @param logDir the transaction database directory
+         * @param zxid the zxid to start reading from
          * @throws IOException
          */
         public FileTxnIterator(File logDir, long zxid) throws IOException {
-          this.logDir = logDir;
-          this.zxid = zxid;
-          init();
+            this(logDir, zxid, true);
         }
 
         /**
@@ -525,10 +561,17 @@ public class FileTxnLog implements TxnLo
             goToNextLog();
             if (!next())
                 return;
-            while (hdr.getZxid() < zxid) {
-                if (!next())
-                    return;
+        }
+        
+        /**
+         * Return total storage size of txnlog that will return by this iterator.
+         */
+        public long getStorageSize() {
+            long sum = 0;
+            for (File f : storedFiles) {
+                sum += f.length();
             }
+            return sum;
         }
 
         /**

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Fri Jun 28 21:22:35 2013
@@ -191,6 +191,33 @@ public class FileTxnSnapLog {
     }
 
     /**
+     * Get TxnIterator for iterating through txnlog starting at a given zxid
+     *
+     * @param zxid starting zxid
+     * @return TxnIterator
+     * @throws IOException
+     */
+    public TxnIterator readTxnLog(long zxid) throws IOException {
+        return readTxnLog(zxid, true);
+    }
+
+    /**
+     * Get TxnIterator for iterating through txnlog starting at a given zxid
+     *
+     * @param zxid starting zxid
+     * @param fastForward true if the iterator should be fast forwarded to point
+     *        to the txn of a given zxid, else the iterator will point to the
+     *        starting txn of a txnlog that may contain txn of a given zxid
+     * @return TxnIterator
+     * @throws IOException
+     */
+    public TxnIterator readTxnLog(long zxid, boolean fastForward)
+            throws IOException {
+        FileTxnLog txnLog = new FileTxnLog(dataDir);
+        return txnLog.read(zxid, fastForward);
+    }
+    
+    /**
      * process the transaction on the datatree
      * @param hdr the hdr of the transaction
      * @param dt the datatree to apply transaction to

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java Fri Jun 28 21:22:35 2013
@@ -116,6 +116,13 @@ public interface TxnLog {
          * @throws IOException
          */
         void close() throws IOException;
+        
+        /**
+         * Get an estimated storage space used to store transaction records
+         * that will return by this iterator
+         * @throws IOException
+         */
+        long getStorageSize() throws IOException;
     }
 }
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Fri Jun 28 21:22:35 2013
@@ -27,7 +27,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.Iterator;
+import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -37,8 +38,8 @@ import org.apache.jute.BinaryOutputArchi
 import org.apache.jute.Record;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
@@ -96,6 +97,28 @@ public class LearnerHandler extends Thre
     private BinaryOutputArchive oa;
 
     private BufferedOutputStream bufferedOutput;
+    
+    /**
+     * Keep track of whether we have started send packets thread
+     */
+    private volatile boolean sendingThreadStarted = false;
+
+    /**
+     * For testing purpose, force leader to use snapshot to sync with followers
+     */
+    public static final String FORCE_SNAP_SYNC = "zookeeper.forceSnapshotSync";
+    private boolean forceSnapSync = false;
+
+    /**
+     * Keep track of whether we need to queue TRUNC or DIFF into packet queue
+     * that we are going to blast it to the learner
+     */
+    private boolean needOpPacket = true;
+    
+    /**
+     * Last zxid sent to the learner as part of synchronization
+     */
+    private long leaderLastZxid;
 
     LearnerHandler(Socket sock, Leader leader) throws IOException {
         super("LearnerHandler-" + sock.getRemoteSocketAddress());
@@ -315,155 +338,48 @@ public class LearnerHandler extends Thre
                 leader.waitForEpochAck(this.getSid(), ss);
             }
             peerLastZxid = ss.getLastZxid();
-
-            /* the default to send to the follower */
-            int packetToSend = Leader.SNAP;
-            long zxidToSend = 0;
-            long leaderLastZxid = 0;
-            /** the packets that the follower needs to get updates from **/
-            long updates = peerLastZxid;
-
-            /* we are sending the diff check if we have proposals in memory to be able to
-             * send a diff to the
-             */
-            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
-            ReadLock rl = lock.readLock();
-            try {
-                rl.lock();
-                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
-                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
-                LOG.info("Synchronizing with Follower sid: " + sid
-                        +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
-                        +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
-                        +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
-
-                List<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
-
-                if (proposals.size() != 0) {
-                    LOG.debug("proposal size is {}", proposals.size());
-                    if ((maxCommittedLog >= peerLastZxid)
-                            && (minCommittedLog <= peerLastZxid)) {
-                        LOG.debug("Sending proposals to follower");
-
-                        // as we look through proposals, this variable keeps track of previous
-                        // proposal Id.
-                        long prevProposalZxid = minCommittedLog;
-
-                        // Keep track of whether we are about to send the first packet.
-                        // Before sending the first packet, we have to tell the learner
-                        // whether to expect a trunc or a diff
-                        boolean firstPacket=true;
-
-                        // If we are here, we can use committedLog to sync with
-                        // follower. Then we only need to decide whether to
-                        // send trunc or not
-                        packetToSend = Leader.DIFF;
-                        zxidToSend = maxCommittedLog;
-
-                        for (Proposal propose: proposals) {
-                            // skip the proposals the peer already has
-                            if (propose.packet.getZxid() <= peerLastZxid) {
-                                prevProposalZxid = propose.packet.getZxid();
-                                continue;
-                            } else {
-                                // If we are sending the first packet, figure out whether to trunc
-                                // in case the follower has some proposals that the leader doesn't
-                                if (firstPacket) {
-                                    firstPacket = false;
-                                    // Does the peer have some proposals that the leader hasn't seen yet
-                                    if (prevProposalZxid < peerLastZxid) {
-                                        // send a trunc message before sending the diff
-                                        packetToSend = Leader.TRUNC;
-                                        zxidToSend = prevProposalZxid;
-                                        updates = zxidToSend;
-                                    }
-                                }
-                                queuePacket(propose.packet);
-                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
-                                        null, null);
-                                queuePacket(qcommit);
-                            }
-                        }
-                    } else if (peerLastZxid > maxCommittedLog) {
-                        LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
-                                Long.toHexString(maxCommittedLog),
-                                Long.toHexString(updates));
-
-                        packetToSend = Leader.TRUNC;
-                        zxidToSend = maxCommittedLog;
-                        updates = zxidToSend;
-                    } else {
-                        LOG.warn("Unhandled proposal scenario");
-                    }
-                } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
-                    // The leader may recently take a snapshot, so the committedLog
-                    // is empty. We don't need to send snapshot if the follow
-                    // is already sync with in-memory db.
-                    LOG.debug("committedLog is empty but leader and follower "
-                            + "are in sync, zxid=0x{}",
-                            Long.toHexString(peerLastZxid));
-                    packetToSend = Leader.DIFF;
-                    zxidToSend = peerLastZxid;
-                } else {
-                    // just let the state transfer happen
-                    LOG.debug("proposals is empty");
-                }               
-                LOG.info("Sending " + Leader.getPacketType(packetToSend));
-                leaderLastZxid = leader.startForwarding(this, updates);
-
-            } finally {
-                rl.unlock();
-            }
-          
+           
+            // Take any necessary action if we need to send TRUNC or DIFF
+            // startForwarding() will be called in all cases
+            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
+            
             LOG.debug("Sending NEWLEADER message to " + sid);
             // the version of this quorumVerifier will be set by leader.lead() in case
             // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
             // we got here, so the version was set
-            
-             if (getVersion() < 0x10000) {
-            	 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                 		newLeaderZxid, null, null);
+            if (getVersion() < 0x10000) {
+                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                        newLeaderZxid, null, null);
                 oa.writeRecord(newLeaderQP, "packet");
             } else {
-            	QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                		newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
+                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
+                                .toString().getBytes(), null);
                 queuedPackets.add(newLeaderQP);
             }
             bufferedOutput.flush();
-            //Need to set the zxidToSend to the latest zxid
-            if (packetToSend == Leader.SNAP) {
-                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
-            }
-            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
-            bufferedOutput.flush();
 
             /* if we are not truncating or sending a diff just send a snapshot */
-            if (packetToSend == Leader.SNAP) {
+            if (needSnap) {
+                long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
+                bufferedOutput.flush();
+
                 LOG.info("Sending snapshot last zxid of peer is 0x"
                         + Long.toHexString(peerLastZxid) + " "
-                        + " zxid of leader is 0x"
-                        + Long.toHexString(leaderLastZxid)
+                        + "zxid of leader is 0x"
+                        + Long.toHexString(leaderLastZxid) + " "
                         + "sent zxid of db as 0x"
                         + Long.toHexString(zxidToSend));
                 // Dump data to peer
                 leader.zk.getZKDatabase().serializeSnapshot(oa);
                 oa.writeString("BenWasHere", "signature");
+                bufferedOutput.flush();
             }
-            bufferedOutput.flush();
-
-            // Start sending packets
-            new Thread() {
-                public void run() {
-                    Thread.currentThread().setName(
-                            "Sender-" + sock.getRemoteSocketAddress());
-                    try {
-                        sendPackets();
-                    } catch (InterruptedException e) {
-                        LOG.warn("Unexpected interruption",e);
-                    }
-                }
-            }.start();
 
+            // Start thread that blast packets in the queue to learner
+            startSendingPackets();
+            
             /*
              * Have to wait for the first ACK, wait until
              * the leader is ready, and only then we can
@@ -607,6 +523,273 @@ public class LearnerHandler extends Thre
         }
     }
 
+    /**
+     * Start thread that will forward any packet in the queue to the follower
+     */
+    protected void startSendingPackets() {
+        if (!sendingThreadStarted) {
+            // Start sending packets
+            new Thread() {
+                public void run() {
+                    Thread.currentThread().setName(
+                            "Sender-" + sock.getRemoteSocketAddress());
+                    try {
+                        sendPackets();
+                    } catch (InterruptedException e) {
+                        LOG.warn("Unexpected interruption " + e.getMessage());
+                    }
+                }
+            }.start();
+            sendingThreadStarted = true;
+        } else {
+            LOG.error("Attempting to start sending thread after it already started");
+        }
+    }
+
+    /**
+     * Determine if we need to sync with follower using DIFF/TRUNC/SNAP
+     * and setup follower to receive packets from commit processor
+     *
+     * @param peerLastZxid
+     * @param db
+     * @param leader
+     * @return true if snapshot transfer is needed.
+     */
+    public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
+        /*
+         * When leader election is completed, the leader will set its
+         * lastProcessedZxid to be (epoch < 32). There will be no txn associated
+         * with this zxid.
+         *
+         * The learner will set its lastProcessedZxid to the same value if
+         * it get DIFF or SNAP from the leader. If the same learner come
+         * back to sync with leader using this zxid, we will never find this
+         * zxid in our history. In this case, we will ignore TRUNC logic and
+         * always send DIFF if we have old enough history
+         */
+        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
+        // Keep track of the latest zxid which already queued
+        long currentZxid = peerLastZxid;
+        boolean needSnap = true;
+        boolean txnLogSyncEnabled = (db.getSnapshotSizeFactor() >= 0);
+        ReentrantReadWriteLock lock = db.getLogLock();
+        ReadLock rl = lock.readLock();
+        try {
+            rl.lock();
+            long maxCommittedLog = db.getmaxCommittedLog();
+            long minCommittedLog = db.getminCommittedLog();
+            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
+
+            LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
+                    + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+                    + " peerLastZxid=0x{}", getSid(),
+                    Long.toHexString(maxCommittedLog),
+                    Long.toHexString(minCommittedLog),
+                    Long.toHexString(lastProcessedZxid),
+                    Long.toHexString(peerLastZxid));
+
+            if (db.getCommittedLog().isEmpty()) {
+                /*
+                 * It is possible that commitedLog is empty. In that case
+                 * setting these value to the latest txn in leader db
+                 * will reduce the case that we need to handle
+                 *
+                 * Here is how each case handle by the if block below
+                 * 1. lastProcessZxid == peerZxid -> Handle by (2)
+                 * 2. lastProcessZxid < peerZxid -> Handle by (3)
+                 * 3. lastProcessZxid > peerZxid -> Handle by (5)
+                 */
+                minCommittedLog = lastProcessedZxid;
+                maxCommittedLog = lastProcessedZxid;
+            }
+
+            /*
+             * Here are the cases that we want to handle
+             *
+             * 1. Force sending snapshot (for testing purpose)
+             * 2. Peer and leader is already sync, send empty diff
+             * 3. Follower has txn that we haven't seen. This may be old leader
+             *    so we need to send TRUNC. However, if peer has newEpochZxid,
+             *    we cannot send TRUC since the follower has no txnlog
+             * 4. Follower is within committedLog range or already in-sync.
+             *    We may need to send DIFF or TRUNC depending on follower's zxid
+             *    We always send empty DIFF if follower is already in-sync
+             * 5. Follower missed the committedLog. We will try to use on-disk
+             *    txnlog + committedLog to sync with follower. If that fail,
+             *    we will send snapshot
+             */
+
+            if (forceSnapSync) {
+                // Force leader to use snapshot to sync with follower
+                LOG.warn("Forcing snapshot sync - should not see this in production");
+            } else if (lastProcessedZxid == peerLastZxid) {
+                // Follower is already sync with us, send empty diff
+                LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
+                         " for peer sid: " +  getSid());
+                queueOpPacket(Leader.DIFF, peerLastZxid);
+                needOpPacket = false;
+                needSnap = false;
+            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
+                // Newer than commitedLog, send trunc and done
+                LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
+                          Long.toHexString(maxCommittedLog) +
+                          " for peer sid:" +  getSid());
+                queueOpPacket(Leader.TRUNC, maxCommittedLog);
+                currentZxid = maxCommittedLog;
+                needOpPacket = false;
+                needSnap = false;
+            } else if ((maxCommittedLog >= peerLastZxid)
+                    && (minCommittedLog <= peerLastZxid)) {
+                // Follower is within commitLog range
+                LOG.info("Using committedLog for peer sid: " +  getSid());
+                Iterator<Proposal> itr = db.getCommittedLog().iterator();
+                currentZxid = queueCommittedProposals(itr, peerLastZxid,
+                                                     null, maxCommittedLog);
+                needSnap = false;
+            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
+                // Use txnlog and committedLog to sync
+
+                // Calculate sizeLimit that we allow to retrieve txnlog from disk
+                long sizeLimit = db.calculateTxnLogSizeLimit();
+                // This method can return empty iterator if the requested zxid
+                // is older than on-disk txnlog
+                Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
+                        peerLastZxid, sizeLimit);
+                if (txnLogItr.hasNext()) {
+                    LOG.info("Use txnlog and committedLog for peer sid: " +  getSid());
+                    currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
+                                                         minCommittedLog, maxCommittedLog);
+
+                    LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
+                    Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
+                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
+                                                         null, maxCommittedLog);
+                    needSnap = false;
+                }
+            } else {
+                LOG.warn("Unhandled scenario for peer sid: " +  getSid());
+            }
+            LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
+                      " for peer sid: " +  getSid());
+            leaderLastZxid = leader.startForwarding(this, currentZxid);
+        } finally {
+            rl.unlock();
+        }
+
+        if (needOpPacket && !needSnap) {
+            // This should never happen, but we should fall back to sending
+            // snapshot just in case.
+            LOG.error("Unhandled scenario for peer sid: " +  getSid() +
+                     " fall back to use snapshot");
+            needSnap = true;
+        }
+
+        return needSnap;
+    }
+
+    /**
+     * Queue committed proposals into packet queue. The range of packets which
+     * is going to be queued are (peerLaxtZxid, maxZxid]
+     *
+     * @param itr  iterator point to the proposals
+     * @param peerLastZxid  last zxid seen by the follower
+     * @param maxZxid  max zxid of the proposal to queue, null if no limit
+     * @param lastCommitedZxid when sending diff, we need to send lastCommitedZxid
+     *        on the leader to follow Zab 1.0 protocol.
+     * @return last zxid of the queued proposal
+     */
+    protected long queueCommittedProposals(Iterator<Proposal> itr,
+            long peerLastZxid, Long maxZxid, Long lastCommitedZxid) {
+        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
+        long queuedZxid = peerLastZxid;
+        // as we look through proposals, this variable keeps track of previous
+        // proposal Id.
+        long prevProposalZxid = -1;
+        while (itr.hasNext()) {
+            Proposal propose = itr.next();
+
+            long packetZxid = propose.packet.getZxid();
+            // abort if we hit the limit
+            if ((maxZxid != null) && (packetZxid > maxZxid)) {
+                break;
+            }
+
+            // skip the proposals the peer already has
+            if (packetZxid < peerLastZxid) {
+                prevProposalZxid = packetZxid;
+                continue;
+            }
+
+            // If we are sending the first packet, figure out whether to trunc
+            // or diff
+            if (needOpPacket) {
+
+                // Send diff when we see the follower's zxid in our history
+                if (packetZxid == peerLastZxid) {
+                    LOG.info("Sending DIFF zxid=0x" +
+                             Long.toHexString(lastCommitedZxid) +
+                             " for peer sid: " + getSid());
+                    queueOpPacket(Leader.DIFF, lastCommitedZxid);
+                    needOpPacket = false;
+                    continue;
+                }
+
+                if (isPeerNewEpochZxid) {
+                   // Send diff and fall through if zxid is of a new-epoch
+                   LOG.info("Sending DIFF zxid=0x" +
+                            Long.toHexString(lastCommitedZxid) +
+                            " for peer sid: " + getSid());
+                   queueOpPacket(Leader.DIFF, lastCommitedZxid);
+                   needOpPacket = false;
+                } else if (packetZxid > peerLastZxid  ) {
+                    // Peer have some proposals that the leader hasn't seen yet
+                    // it may used to be a leader
+                    if (ZxidUtils.getEpochFromZxid(packetZxid) !=
+                            ZxidUtils.getEpochFromZxid(peerLastZxid)) {
+                        // We cannot send TRUNC that cross epoch boundary.
+                        // The learner will crash if it is asked to do so.
+                        // We will send snapshot this those cases.
+                        LOG.warn("Cannot send TRUNC to peer sid: " + getSid() +
+                                 " peer zxid is from different epoch" );
+                        return queuedZxid;
+                    }
+
+                    LOG.info("Sending TRUNC zxid=0x" +
+                            Long.toHexString(prevProposalZxid) +
+                            " for peer sid: " + getSid());
+                    queueOpPacket(Leader.TRUNC, prevProposalZxid);
+                    needOpPacket = false;
+                }
+            }
+
+            if (packetZxid <= queuedZxid) {
+                // We can get here, if we don't have op packet to queue
+                // or there is a duplicate txn in a given iterator
+                continue;
+            }
+
+            // Since this is already a committed proposal, we need to follow
+            // it by a commit packet
+            queuePacket(propose.packet);
+            queueOpPacket(Leader.COMMIT, packetZxid);
+            queuedZxid = packetZxid;
+
+        }
+
+        if (needOpPacket && isPeerNewEpochZxid) {
+            // We will send DIFF for this kind of zxid in any case. This if-block
+            // is the catch when our history older than learner and there is
+            // no new txn since then. So we need an empty diff
+            LOG.info("Sending DIFF zxid=0x" +
+                     Long.toHexString(lastCommitedZxid) +
+                     " for peer sid: " + getSid());
+            queueOpPacket(Leader.DIFF, lastCommitedZxid);
+            needOpPacket = false;
+        }
+
+        return queuedZxid;
+    }    
+    
     public void shutdown() {
         // Send the packet of death
         try {
@@ -633,6 +816,11 @@ public class LearnerHandler extends Thre
      * ping calls from the leader to the peers
      */
     public void ping() {
+        // If learner hasn't sync properly yet, don't send ping packet
+        // otherwise, the learner will crash
+        if (!sendingThreadStarted) {
+            return;
+        }
         long id;
         synchronized(leader) {
             id = leader.lastProposed;
@@ -642,6 +830,16 @@ public class LearnerHandler extends Thre
         queuePacket(ping);
     }
 
+    /**
+     * Queue leader packet of a given type
+     * @param type
+     * @param zxid
+     */
+    private void queueOpPacket(int type, long zxid) {
+        QuorumPacket packet = new QuorumPacket(type, zxid, null, null);
+        queuePacket(packet);
+    }
+    
     void queuePacket(QuorumPacket p) {
         queuedPackets.add(p);
     }
@@ -650,4 +848,19 @@ public class LearnerHandler extends Thre
         return isAlive()
         && leader.self.tick <= tickOfNextAckDeadline;
     }
+    
+    /**
+     * For testing, return packet queue
+     * @return
+     */
+    public Queue<QuorumPacket> getQueuedPackets() {
+        return queuedPackets;
+    }
+
+    /**
+     * For testing, we need to reset this value
+     */
+    public void setFirstPacket(boolean value) {
+        needOpPacket = value;
+    }
 }

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java?rev=1497929&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java Fri Jun 28 21:22:35 2013
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LearnerHandlerTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(LearnerHandlerTest.class);
+
+    class MockLearnerHandler extends LearnerHandler {
+        boolean threadStarted = false;
+
+        MockLearnerHandler(Socket sock, Leader leader) throws IOException {
+            super(sock, leader);
+        }
+
+        protected void startSendingPackets() {
+            threadStarted = true;
+        }
+    }
+
+    class MockZKDatabase extends ZKDatabase {
+        long lastProcessedZxid;
+        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+        LinkedList<Proposal> committedLog = new LinkedList<Leader.Proposal>();
+        LinkedList<Proposal> txnLog = new LinkedList<Leader.Proposal>();
+
+        public MockZKDatabase(FileTxnSnapLog snapLog) {
+            super(snapLog);
+        }
+
+        public long getDataTreeLastProcessedZxid() {
+            return lastProcessedZxid;
+        }
+
+        public long getmaxCommittedLog() {
+            if (!committedLog.isEmpty()) {
+                return committedLog.getLast().packet.getZxid();
+            }
+            return 0;
+        }
+
+        public long getminCommittedLog() {
+            if (!committedLog.isEmpty()) {
+                return committedLog.getFirst().packet.getZxid();
+            }
+            return 0;
+        }
+
+        public LinkedList<Proposal> getCommittedLog() {
+            return committedLog;
+        }
+
+        public ReentrantReadWriteLock getLogLock() {
+            return lock;
+        }
+
+        public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid,
+                long limit) {
+            if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
+                return txnLog.iterator();
+            } else {
+                return (new LinkedList<Proposal>()).iterator();
+            }
+
+        }
+
+        public long calculateTxnLogSizeLimit() {
+            return 1;
+        }
+    }
+
+    private MockLearnerHandler learnerHandler;
+    private Socket sock;
+
+    // Member variables for mocking Leader
+    private Leader leader;
+    private long currentZxid;
+
+    // Member variables for mocking ZkDatabase
+    private MockZKDatabase db;
+
+    @Before
+    public void setUp() throws Exception {
+        // Intercept when startForwarding is called
+        leader = mock(Leader.class);
+        when(
+                leader.startForwarding(Matchers.any(LearnerHandler.class),
+                        Matchers.anyLong())).thenAnswer(new Answer() {
+            public Object answer(InvocationOnMock invocation) {
+                currentZxid = (Long) invocation.getArguments()[1];
+                return 0;
+            }
+        });
+
+        sock = mock(Socket.class);
+
+        db = new MockZKDatabase(null);
+        learnerHandler = new MockLearnerHandler(sock, leader);
+    }
+
+    Proposal createProposal(long zxid) {
+        Proposal p = new Proposal();
+        p.packet = new QuorumPacket();
+        p.packet.setZxid(zxid);
+        p.packet.setType(Leader.PROPOSAL);
+        return p;
+    }
+
+    /**
+     * Validate that queued packets contains proposal in the following orders as
+     * a given array of zxids
+     *
+     * @param zxids
+     */
+    public void queuedPacketMatches(long[] zxids) {
+        int index = 0;
+        for (QuorumPacket qp : learnerHandler.getQueuedPackets()) {
+            if (qp.getType() == Leader.PROPOSAL) {
+                assertZxidEquals(zxids[index++], qp.getZxid());
+            }
+        }
+    }
+
+    void reset() {
+        learnerHandler.getQueuedPackets().clear();
+        learnerHandler.threadStarted = false;
+        learnerHandler.setFirstPacket(true);
+    }
+
+    /**
+     * Check if op packet (first packet in the queue) match the expected value
+     * @param type - type of packet
+     * @param zxid - zxid in the op packet
+     * @param currentZxid - last packet queued by syncFollower,
+     *                      before invoking startForwarding()
+     */
+    public void assertOpType(int type, long zxid, long currentZxid) {
+        Queue<QuorumPacket> packets = learnerHandler.getQueuedPackets();
+        assertTrue(packets.size() > 0);
+        assertEquals(type, packets.peek().getType());
+        assertZxidEquals(zxid, packets.peek().getZxid());
+        assertZxidEquals(currentZxid, this.currentZxid);
+    }
+
+    void assertZxidEquals(long expected, long value) {
+        assertEquals("Expected 0x" + Long.toHexString(expected) + " but was 0x"
+                + Long.toHexString(value), expected, value);
+    }
+
+    /**
+     * Test cases when leader has empty commitedLog
+     */
+    @Test
+    public void testEmptyCommittedLog() throws Exception {
+        long peerZxid;
+
+        // Peer has newer zxid
+        peerZxid = 3;
+        db.lastProcessedZxid = 1;
+        db.committedLog.clear();
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send TRUNC and forward any packet starting lastProcessedZxid
+        assertOpType(Leader.TRUNC, db.lastProcessedZxid, db.lastProcessedZxid);
+        reset();
+
+        // Peer is already sync
+        peerZxid = 1;
+        db.lastProcessedZxid = 1;
+        db.committedLog.clear();
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF and forward any packet starting lastProcessedZxid
+        assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
+        assertEquals(1, learnerHandler.getQueuedPackets().size());
+        reset();
+
+        // Peer has 0 zxid (new machine turn up), txnlog
+        // is disabled
+        peerZxid = 0;
+        db.setSnapshotSizeFactor(-1);
+        db.lastProcessedZxid = 1;
+        db.committedLog.clear();
+        // We send SNAP
+        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertEquals(0, learnerHandler.getQueuedPackets().size());
+        reset();
+
+    }
+
+    /**
+     * Test cases when leader has committedLog
+     */
+    @Test
+    public void testCommittedLog() throws Exception {
+        long peerZxid;
+
+        // Commit proposal may lag behind data tree, but it shouldn't affect
+        // us in any case
+        db.lastProcessedZxid = 6;
+        db.committedLog.add(createProposal(2));
+        db.committedLog.add(createProposal(3));
+        db.committedLog.add(createProposal(5));
+
+        // Peer has zxid that we have never seen
+        peerZxid = 4;
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send TRUNC to 3 and forward any packet starting 5
+        assertOpType(Leader.TRUNC, 3, 5);
+        // DIFF + 1 proposals + 1 commit
+        assertEquals(3, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { 5 });
+        reset();
+
+        // Peer is within committedLog range
+        peerZxid = 2;
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF and forward any packet starting lastProcessedZxid
+        assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
+                db.getmaxCommittedLog());
+        // DIFF + 2 proposals + 2 commit
+        assertEquals(5, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { 3, 5 });
+        reset();
+
+        // Peer miss the committedLog and txnlog is disabled
+        peerZxid = 1;
+        db.setSnapshotSizeFactor(-1);
+        // We send SNAP
+        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertEquals(0, learnerHandler.getQueuedPackets().size());
+        reset();
+    }
+
+    /**
+     * Test cases when txnlog is enabled
+     */
+    @Test
+    public void testTxnLog() throws Exception {
+        long peerZxid;
+        db.txnLog.add(createProposal(2));
+        db.txnLog.add(createProposal(3));
+        db.txnLog.add(createProposal(5));
+        db.txnLog.add(createProposal(6));
+        db.txnLog.add(createProposal(7));
+        db.txnLog.add(createProposal(8));
+        db.txnLog.add(createProposal(9));
+
+        db.lastProcessedZxid = 9;
+        db.committedLog.add(createProposal(6));
+        db.committedLog.add(createProposal(7));
+        db.committedLog.add(createProposal(8));
+
+        // Peer has zxid that we have never seen
+        peerZxid = 4;
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
+        assertOpType(Leader.TRUNC, 3, db.getmaxCommittedLog());
+        // DIFF + 4 proposals + 4 commit
+        assertEquals(9, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { 5, 6, 7, 8 });
+        reset();
+
+        // Peer zxid is in txnlog range
+        peerZxid = 3;
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF and forward any packet starting at maxCommittedLog
+        assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
+                db.getmaxCommittedLog());
+        // DIFF + 4 proposals + 4 commit
+        assertEquals(9, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { 5, 6, 7, 8 });
+        reset();
+
+    }
+
+    /**
+     * Test cases when txnlog is enabled and commitedLog is empty
+     */
+    @Test
+    public void testTxnLogOnly() throws Exception {
+        long peerZxid;
+
+        // CommmitedLog is empty, we will use txnlog up to lastProcessZxid
+        db.lastProcessedZxid = 7;
+        db.txnLog.add(createProposal(2));
+        db.txnLog.add(createProposal(3));
+        db.txnLog.add(createProposal(5));
+        db.txnLog.add(createProposal(6));
+        db.txnLog.add(createProposal(7));
+        db.txnLog.add(createProposal(8));
+
+        // Peer has zxid that we have never seen
+        peerZxid = 4;
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send TRUNC to 3 and forward any packet starting at
+        // lastProcessedZxid
+        assertOpType(Leader.TRUNC, 3, db.lastProcessedZxid);
+        // DIFF + 3 proposals + 3 commit
+        assertEquals(7, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { 5, 6, 7 });
+        reset();
+
+        // Peer has zxid in txnlog range
+        peerZxid = 2;
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF and forward any packet starting at lastProcessedZxid
+        assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
+        // DIFF + 4 proposals + 4 commit
+        assertEquals(9, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { 3, 5, 6, 7 });
+        reset();
+
+        // Peer miss the txnlog
+        peerZxid = 1;
+        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send snap
+        assertEquals(0, learnerHandler.getQueuedPackets().size());
+        reset();
+    }
+
+    long getZxid(long epoch, long counter){
+        return ZxidUtils.makeZxid(epoch, counter);
+    }
+
+    /**
+     * Test cases with zxids that are negative long
+     */
+    @Test
+    public void testTxnLogWithNegativeZxid() throws Exception {
+        long peerZxid;
+        db.txnLog.add(createProposal(getZxid(0xf, 2)));
+        db.txnLog.add(createProposal(getZxid(0xf, 3)));
+        db.txnLog.add(createProposal(getZxid(0xf, 5)));
+        db.txnLog.add(createProposal(getZxid(0xf, 6)));
+        db.txnLog.add(createProposal(getZxid(0xf, 7)));
+        db.txnLog.add(createProposal(getZxid(0xf, 8)));
+        db.txnLog.add(createProposal(getZxid(0xf, 9)));
+
+        db.lastProcessedZxid = getZxid(0xf, 9);
+        db.committedLog.add(createProposal(getZxid(0xf, 6)));
+        db.committedLog.add(createProposal(getZxid(0xf, 7)));
+        db.committedLog.add(createProposal(getZxid(0xf, 8)));
+
+        // Peer has zxid that we have never seen
+        peerZxid = getZxid(0xf, 4);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
+        assertOpType(Leader.TRUNC, getZxid(0xf, 3), db.getmaxCommittedLog());
+        // DIFF + 4 proposals + 4 commit
+        assertEquals(9, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { getZxid(0xf, 5),
+                getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
+        reset();
+
+        // Peer zxid is in txnlog range
+        peerZxid = getZxid(0xf, 3);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF and forward any packet starting at maxCommittedLog
+        assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
+                db.getmaxCommittedLog());
+        // DIFF + 4 proposals + 4 commit
+        assertEquals(9, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { getZxid(0xf, 5),
+                getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
+        reset();
+    }
+
+    /**
+     * Test cases when peer has new-epoch zxid
+     */
+    @Test
+    public void testNewEpochZxid() throws Exception {
+        long peerZxid;
+        db.txnLog.add(createProposal(getZxid(0, 1)));
+        db.txnLog.add(createProposal(getZxid(1, 1)));
+        db.txnLog.add(createProposal(getZxid(1, 2)));
+
+        // After leader election, lastProcessedZxid will point to new epoch
+        db.lastProcessedZxid = getZxid(2, 0);
+        db.committedLog.add(createProposal(getZxid(1, 1)));
+        db.committedLog.add(createProposal(getZxid(1, 2)));
+
+        // Peer has zxid of epoch 0
+        peerZxid = getZxid(0, 0);
+        // We should get snap, we can do better here, but the main logic is
+        // that we should never send diff if we have never seen any txn older
+        // than peer zxid
+        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertEquals(0, learnerHandler.getQueuedPackets().size());
+        reset();
+
+        // Peer has zxid of epoch 1
+        peerZxid = getZxid(1, 0);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
+        assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
+        // DIFF + 2 proposals + 2 commit
+        assertEquals(5, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
+        reset();
+
+        // Peer has zxid of epoch 2, so it is already sync
+        peerZxid = getZxid(2, 0);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF to (2, 0) and forward any packet starting at (2, 0)
+        assertOpType(Leader.DIFF, getZxid(2, 0), getZxid(2, 0));
+        // DIFF only
+        assertEquals(1, learnerHandler.getQueuedPackets().size());
+        reset();
+
+    }
+
+    /**
+     * Test cases when learner has new-epcoh zxid
+     * (zxid & 0xffffffffL) == 0;
+     */
+    @Test
+    public void testNewEpochZxidWithTxnlogOnly() throws Exception {
+        long peerZxid;
+        db.txnLog.add(createProposal(getZxid(1, 1)));
+        db.txnLog.add(createProposal(getZxid(2, 1)));
+        db.txnLog.add(createProposal(getZxid(2, 2)));
+        db.txnLog.add(createProposal(getZxid(4, 1)));
+
+        // After leader election, lastProcessedZxid will point to new epoch
+        db.lastProcessedZxid = getZxid(6, 0);
+
+        // Peer has zxid of epoch 3
+        peerZxid = getZxid(3, 0);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF to (6,0) and forward any packet starting at (4,1)
+        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
+        // DIFF + 1 proposals + 1 commit
+        assertEquals(3, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { getZxid(4, 1)});
+        reset();
+
+        // Peer has zxid of epoch 4
+        peerZxid = getZxid(4, 0);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF to (6,0) and forward any packet starting at (4,1)
+        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
+        // DIFF + 1 proposals + 1 commit
+        assertEquals(3, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { getZxid(4, 1)});
+        reset();
+
+        // Peer has zxid of epoch 5
+        peerZxid = getZxid(5, 0);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF to (6,0) and forward any packet starting at (5,0)
+        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(5, 0));
+        // DIFF only
+        assertEquals(1, learnerHandler.getQueuedPackets().size());
+        reset();
+
+        // Peer has zxid of epoch 6
+        peerZxid = getZxid(6, 0);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF to (6,0) and forward any packet starting at (6, 0)
+        assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(6, 0));
+        // DIFF only
+        assertEquals(1, learnerHandler.getQueuedPackets().size());
+        reset();
+    }
+
+    /**
+     * Test cases when there is a duplicate txn in the committedLog. This
+     * should never happen unless there is a bug in initialization code
+     * but the learner should never see duplicate packets
+     */
+    @Test
+    public void testDuplicatedTxn() throws Exception {
+        long peerZxid;
+        db.txnLog.add(createProposal(getZxid(0, 1)));
+        db.txnLog.add(createProposal(getZxid(1, 1)));
+        db.txnLog.add(createProposal(getZxid(1, 2)));
+        db.txnLog.add(createProposal(getZxid(1, 1)));
+        db.txnLog.add(createProposal(getZxid(1, 2)));
+
+        // After leader election, lastProcessedZxid will point to new epoch
+        db.lastProcessedZxid = getZxid(2, 0);
+        db.committedLog.add(createProposal(getZxid(1, 1)));
+        db.committedLog.add(createProposal(getZxid(1, 2)));
+        db.committedLog.add(createProposal(getZxid(1, 1)));
+        db.committedLog.add(createProposal(getZxid(1, 2)));
+
+        // Peer has zxid of epoch 1
+        peerZxid = getZxid(1, 0);
+        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
+        assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
+        // DIFF + 2 proposals + 2 commit
+        assertEquals(5, learnerHandler.getQueuedPackets().size());
+        queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
+        reset();
+
+    }
+
+    /**
+     * Test cases when we have to TRUNC learner, but it may cross epoch boundary
+     * so we need to send snap instead
+     */
+    @Test
+    public void testCrossEpochTrunc() throws Exception {
+        long peerZxid;
+        db.txnLog.add(createProposal(getZxid(1, 1)));
+        db.txnLog.add(createProposal(getZxid(2, 1)));
+        db.txnLog.add(createProposal(getZxid(2, 2)));
+        db.txnLog.add(createProposal(getZxid(4, 1)));
+
+        // After leader election, lastProcessedZxid will point to new epoch
+        db.lastProcessedZxid = getZxid(6, 0);
+
+        // Peer has zxid (3, 1)
+        peerZxid = getZxid(3, 1);
+        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertEquals(0, learnerHandler.getQueuedPackets().size());
+        reset();
+    }
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Fri Jun 28 21:22:35 2013
@@ -33,8 +33,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -47,15 +47,37 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class FollowerResyncConcurrencyTest extends ZKTestCase {
-    private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class);
     public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
 
-    private volatile int counter = 0;
+    private AtomicInteger counter = new AtomicInteger(0);
+    private AtomicInteger errors = new AtomicInteger(0);
+    /**
+     * Keep track of pending async operations, we shouldn't start verifying
+     * the state until pending operation is 0
+     */
+    private AtomicInteger pending = new AtomicInteger(0);
+
+    @Before
+    public void setUp() throws Exception {
+        pending.set(0);
+        errors.set(0);
+        counter.set(0);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("Error count {}" , errors.get());
+    }
 
     /**
      * See ZOOKEEPER-1319 - verify that a lagging follwer resyncs correctly
@@ -149,7 +171,28 @@ public class FollowerResyncConcurrencyTe
      */
     @Test
     public void testResyncBySnapThenDiffAfterFollowerCrashes()
-      throws IOException, InterruptedException, KeeperException,  Throwable
+            throws IOException, InterruptedException, KeeperException,  Throwable
+    {
+        followerResyncCrashTest(false);
+    }
+    
+    /**
+     * Same as testResyncBySnapThenDiffAfterFollowerCrashes() but we resync
+     * follower using txnlog
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    @Test
+    public void testResyncByTxnlogThenDiffAfterFollowerCrashes()
+        throws IOException, InterruptedException, KeeperException,  Throwable
+    {
+        followerResyncCrashTest(true);
+    }
+    
+    public void followerResyncCrashTest(boolean useTxnLogResync)
+            throws IOException, InterruptedException, KeeperException,  Throwable
     {
         final Semaphore sem = new Semaphore(0);
 
@@ -166,6 +209,18 @@ public class FollowerResyncConcurrencyTe
 
         Leader leader = qu.getPeer(index).peer.leader;
         assertNotNull(leader);
+        
+        if (useTxnLogResync) {
+            // Set the factor to high value so that this test case always
+            // resync using txnlog
+            qu.getPeer(index).peer.getActiveServer().getZKDatabase()
+                    .setSnapshotSizeFactor(1000);
+        } else {
+            // Disable sending DIFF using txnlog, so that this test still
+            // testing the ZOOKEEPER-962 bug
+            qu.getPeer(index).peer.getActiveServer().getZKDatabase()
+            .setSnapshotSizeFactor(-1);
+        }
 
         /* Reusing the index variable to select a follower to connect to */
         index = (index == 1) ? 2 : 1;
@@ -190,20 +245,28 @@ public class FollowerResyncConcurrencyTe
         LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
 
         zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        
+        // Prepare a thread that will create znodes.
         Thread mytestfooThread = new Thread(new Runnable() {
             @Override
             public void run() {
                 for(int i = 0; i < 3000; i++) {
+                    // Here we create 3000 znodes
                     zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
                         @Override
                         public void processResult(int rc, String path, Object ctx, String name) {
-                            counter++;
-                            if(counter == 16200){
+                            pending.decrementAndGet();
+                            counter.incrementAndGet();
+                            if (rc != 0) {
+                                errors.incrementAndGet();
+                            }
+                            if(counter.get() == 16200){
                                 sem.release();
                             }
                         }
                     }, null);
+                    pending.incrementAndGet();
                     if(i%10==0){
                         try {
                             Thread.sleep(100);
@@ -216,29 +279,43 @@ public class FollowerResyncConcurrencyTe
             }
         });
 
+        // Here we start populating the server and shutdown the follower after
+        // initial data is written.
         for(int i = 0; i < 13000; i++) {
+            // Here we create 13000 znodes
             zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
                 @Override
                 public void processResult(int rc, String path, Object ctx, String name) {
-                    counter++;
-                    if(counter == 16200){
+                    pending.decrementAndGet();
+                    counter.incrementAndGet();
+                    if (rc != 0) {
+                        errors.incrementAndGet();
+                    }
+                    if(counter.get() == 16200){
                         sem.release();
                     }
                 }
             }, null);
+            pending.incrementAndGet();
 
             if(i == 5000){
                 qu.shutdown(index);
                 LOG.info("Shutting down s1");
             }
             if(i == 12000){
-                //Restart off of snap, then get some txns for a log, then shut down
+                // Start the prepared thread so that it is writing znodes while
+                // the follower is restarting. On the first restart, the follow
+                // should use txnlog to catchup. For subsequent restart, the
+                // follower should use a diff to catchup.
                 mytestfooThread.start();
+                LOG.info("Restarting follower " + index);
                 qu.restart(index);
-                Thread.sleep(300);                
-                qu.shutdown(index);                
                 Thread.sleep(300);
+                LOG.info("Shutdown follower " + index);
+                qu.shutdown(index);
+                Thread.sleep(300);
+                LOG.info("Restarting follower " + index);
                 qu.restart(index);
                 LOG.info("Setting up server: " + index);
             }
@@ -250,12 +327,17 @@ public class FollowerResyncConcurrencyTe
                 zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
                     @Override
                     public void processResult(int rc, String path, Object ctx, String name) {
-                        counter++;
-                        if(counter == 16200){
+                        pending.decrementAndGet();
+                        counter.incrementAndGet();
+                        if (rc != 0) {
+                            errors.incrementAndGet();
+                        }
+                        if(counter.get() == 16200){
                             sem.release();
                         }
                     }
                 }, null);
+                pending.incrementAndGet();
             }
         }
 
@@ -267,7 +349,8 @@ public class FollowerResyncConcurrencyTe
         if (mytestfooThread.isAlive()) {
             LOG.error("mytestfooThread is still alive");
         }
-        Thread.sleep(1000);
+        assertTrue(waitForPendingRequests(60));
+        assertTrue(waitForSync(qu, index, 10));
 
         verifyState(qu, index, leader);
         
@@ -353,13 +436,17 @@ public class FollowerResyncConcurrencyTe
 
                             @Override
                             public void processResult(int rc, String path, Object ctx, String name) {
-                                counter++;
-                                if(counter > 7300){
+                                pending.decrementAndGet();
+                                counter.incrementAndGet();
+                                if (rc != 0) {
+                                    errors.incrementAndGet();;
+                                }
+                                if(counter.get() > 7300){
                                     sem.release();
                                 }
                             }
                         }, null);
-
+                        pending.incrementAndGet();
                         try {
                             Thread.sleep(10);
                         } catch (Exception e) {
@@ -379,13 +466,17 @@ public class FollowerResyncConcurrencyTe
 
                 @Override
                 public void processResult(int rc, String path, Object ctx, String name) {
-                    counter++;
-                    if(counter > 7300){
+                    pending.decrementAndGet();
+                    counter.incrementAndGet();
+                    if (rc != 0) {
+                        errors.incrementAndGet();;
+                    }
+                    if(counter.get() > 7300){
                         sem.release();
                     }
                 }
             }, null);
-
+            pending.incrementAndGet();
             if(i == 1000){
                 qu.shutdown(index);
                 Thread.sleep(1100);
@@ -407,12 +498,17 @@ public class FollowerResyncConcurrencyTe
 
                     @Override
                     public void processResult(int rc, String path, Object ctx, String name) {
-                        counter++;
-                        if(counter > 7300){
+                        pending.decrementAndGet();
+                        counter.incrementAndGet();
+                        if (rc != 0) {
+                            errors.incrementAndGet();
+                        }
+                        if(counter.get() > 7300){
                             sem.release();
                         }
                     }
                 }, null);
+                pending.incrementAndGet();
             }
             if(i == 1050 || i == 1100 || i == 1150) {
                 Thread.sleep(1000);
@@ -428,7 +524,8 @@ public class FollowerResyncConcurrencyTe
             LOG.error("mytestfooThread is still alive");
         }
 
-        Thread.sleep(1000);
+        assertTrue(waitForPendingRequests(60));
+        assertTrue(waitForSync(qu, index, 10));
         // Verify that server is following and has the same epoch as the leader
 
         verifyState(qu, index, leader);
@@ -450,6 +547,49 @@ public class FollowerResyncConcurrencyTe
         watcher.waitForConnected(CONNECTION_TIMEOUT);
         return zk;
     }
+    
+    /**
+     * Wait for all async operation to return. So we know that we can start
+     * verifying the state
+     */
+    private boolean waitForPendingRequests(int timeout) throws InterruptedException {
+        LOG.info("Wait for pending requests: " + pending.get());
+        for (int i = 0; i < timeout; ++i) {
+            Thread.sleep(1000);
+            if (pending.get() == 0) {
+                return true;
+            }
+        }
+        LOG.info("Timeout waiting for pending requests: " + pending.get());
+        return false;
+    }
+
+    /**
+     * Wait for all server to have the same lastProccessedZxid. Timeout in seconds
+     */
+    private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException{
+        LOG.info("Wait for server to sync");
+        int leaderIndex = (index == 1) ? 2 : 1;
+        ZKDatabase restartedDb = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
+        ZKDatabase cleanDb =  qu.getPeer(3).peer.getActiveServer().getZKDatabase();
+        ZKDatabase leadDb = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
+        long leadZxid = 0;
+        long cleanZxid = 0;
+        long restartedZxid = 0;
+        for (int i = 0; i < timeout; ++i) {
+            leadZxid = leadDb.getDataTreeLastProcessedZxid();
+            cleanZxid = cleanDb.getDataTreeLastProcessedZxid();
+            restartedZxid = restartedDb.getDataTreeLastProcessedZxid();
+            if (leadZxid == cleanZxid && leadZxid == restartedZxid) {
+                return true;
+            }
+            Thread.sleep(1000);
+        }
+        LOG.info("Timeout waiting for zxid to sync: leader 0x" + Long.toHexString(leadZxid)+
+                 "clean 0x" + Long.toHexString(cleanZxid) +
+                 "restarted 0x" + Long.toHexString(restartedZxid));
+        return false;
+    }
 
     private static TestableZooKeeper createTestableClient(String hp)
         throws IOException, TimeoutException, InterruptedException
@@ -470,6 +610,7 @@ public class FollowerResyncConcurrencyTe
         }
 
     private void verifyState(QuorumUtil qu, int index, Leader leader) {
+        LOG.info("Verifying state");
         assertTrue("Not following", qu.getPeer(index).peer.follower != null);
         long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
         long epochL = (leader.getEpoch() >> 32L);
@@ -487,18 +628,33 @@ public class FollowerResyncConcurrencyTe
         ZKDatabase clean =  qu.getPeer(3).peer.getActiveServer().getZKDatabase();
         ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
         for(Long l : sessionsRestarted) {
+            LOG.info("Validating ephemeral for session id 0x" + Long.toHexString(l));
             assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
             Set<String> ephemerals = restarted.getEphemerals(l);
             Set<String> cleanEphemerals = clean.getEphemerals(l);
-            for(Object o : cleanEphemerals) {
+            for(String o : cleanEphemerals) {
                 if(!ephemerals.contains(o)) {
-                    LOG.info("Restarted follower doesn't contain ephemeral " + o);
+                    LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}",
+                            o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
+                }
+            }
+            for(String o : ephemerals) {
+                if(!cleanEphemerals.contains(o)) {
+                    LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}",
+                            o, Long.toHexString(restarted.getDataTree().getNode(o).stat.getMzxid()));
                 }
             }
             Set<String> leadEphemerals = lead.getEphemerals(l);
-            for(Object o : leadEphemerals) {
+            for(String o : leadEphemerals) {
                 if(!cleanEphemerals.contains(o)) {
-                    LOG.info("Follower doesn't contain ephemeral from leader " + o);
+                    LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}",
+                            o, Long.toHexString(lead.getDataTree().getNode(o).stat.getMzxid()));
+                }
+            }
+            for(String o : cleanEphemerals) {
+                if(!leadEphemerals.contains(o)) {
+                    LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}",
+                            o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
                 }
             }
             assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
@@ -528,6 +684,7 @@ public class FollowerResyncConcurrencyTe
         long lzxid = zk.testableLastZxid();
         assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0);
         zk.close();
+        qu.shutdownAll();
     }
 
     private class MyWatcher extends CountdownWatcher {
@@ -584,6 +741,7 @@ public class FollowerResyncConcurrencyTe
 
         zk1.close();
         zk2.close();
+        qu.shutdownAll();
     }
 
 }