You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/01/23 06:29:41 UTC

svn commit: r1062327 - in /zookeeper/branches/branch-3.3: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/persistence/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: breed
Date: Sun Jan 23 05:29:40 2011
New Revision: 1062327

URL: http://svn.apache.org/viewvc?rev=1062327&view=rev
Log:
ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF
ZOOKEEPER-882. Startup loads last transaction from snapshot

Added:
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Modified:
    zookeeper/branches/branch-3.3/CHANGES.txt
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java

Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Sun Jan 23 05:29:40 2011
@@ -9,6 +9,10 @@ BUGFIXES:
   ZOOKEEPER-921. zkPython incorrectly checks for existence of required
   ACL elements (Nicholas Knight via henryr)
 
+  ZOOKEEPER-882. Startup loads last transaction from snapshot (Jared Cantwell via breed)
+
+  ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF (Camille Fournier via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-963. Make Forrest work with JDK6 (Carl Steinbach via henryr)
 

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Sun Jan 23 05:29:40 2011
@@ -26,6 +26,9 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
@@ -33,16 +36,16 @@ import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
 import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -67,6 +70,7 @@ public class ZKDatabase {
     public static final int commitLogCount = 500;
     protected static int commitLogBuffer = 700;
     protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
+    protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
     volatile private boolean initialized = false;
     
     /**
@@ -104,8 +108,12 @@ public class ZKDatabase {
          */
         dataTree = new DataTree();
         sessionsWithTimeouts.clear();
-        synchronized (committedLog) {
+        WriteLock lock = logLock.writeLock();
+        try {            
+            lock.lock();
             committedLog.clear();
+        } finally {
+            lock.unlock();
         }
         initialized = false;
     }
@@ -136,13 +144,30 @@ public class ZKDatabase {
     public long getminCommittedLog() {
         return minCommittedLog;
     }
-    
-    public LinkedList<Proposal> getCommittedLog() {
-        synchronized (this.committedLog) {
-            return new LinkedList<Proposal>(this.committedLog);
-        }
+    /**
+     * Get the lock that controls the committedLog. If you want to get the pointer to the committedLog, you need
+     * to use this lock to acquire a read lock before calling getCommittedLog()
+     * @return the lock that controls the committed log
+     */
+    public ReentrantReadWriteLock getLogLock() {
+        return logLock;
     }
     
+
+    public synchronized LinkedList<Proposal> getCommittedLog() {
+        ReadLock rl = logLock.readLock();
+        // only make a copy if this thread isn't already holding a lock
+        if(logLock.getReadHoldCount() <=0) {
+            try {
+                rl.lock();
+                return new LinkedList<Proposal>(this.committedLog);
+            } finally {
+                rl.unlock();
+            }
+        } 
+        return this.committedLog;
+    }      
+    
     /**
      * get the last processed zxid from a datatree
      * @return the last processed zxid of a datatree
@@ -206,7 +231,9 @@ public class ZKDatabase {
      * @param request committed request
      */
     public void addCommittedProposal(Request request) {
-        synchronized (committedLog) {
+        WriteLock wl = logLock.writeLock();
+        try {
+            wl.lock();
             if (committedLog.size() > commitLogCount) {
                 committedLog.removeFirst();
                 minCommittedLog = committedLog.getFirst().packet.getZxid();
@@ -234,6 +261,8 @@ public class ZKDatabase {
             p.request = request;
             committedLog.add(p);
             maxCommittedLog = p.packet.getZxid();
+        } finally {
+            wl.unlock();
         }
     }
 

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Sun Jan 23 05:29:40 2011
@@ -570,11 +570,14 @@ public class FileTxnLog implements TxnLo
                 inputStream.close();
                 inputStream = null;
                 ia = null;
+                hdr = null;
                 // thsi means that the file has ended
                 // we shoud go to the next file
                 if (!goToNextLog()) {
                     return false;
                 }
+                // if we went to the next log file, we should call next() again
+                return next();
             }
             return true;
         }

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Sun Jan 23 05:29:40 2011
@@ -123,7 +123,7 @@ public class FileTxnSnapLog {
             PlayBackListener listener) throws IOException {
         snapLog.deserialize(dt, sessions);
         FileTxnLog txnLog = new FileTxnLog(dataDir);
-        TxnIterator itr = txnLog.read(dt.lastProcessedZxid);
+        TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
         long highestZxid = dt.lastProcessedZxid;
         TxnHeader hdr;
         while (true) {

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Sun Jan 23 05:29:40 2011
@@ -46,6 +46,7 @@ public class CommitProcessor extends Thr
     LinkedList<Request> committedRequests = new LinkedList<Request>();
 
     RequestProcessor nextProcessor;
+    ArrayList<Request> toProcess = new ArrayList<Request>();
 
     /**
      * This flag indicates whether we need to wait for a response to come back from the
@@ -65,8 +66,7 @@ public class CommitProcessor extends Thr
     @Override
     public void run() {
         try {
-            Request nextPending = null;
-            ArrayList<Request> toProcess = new ArrayList<Request>();
+            Request nextPending = null;            
             while (!finished) {
                 int len = toProcess.size();
                 for (int i = 0; i < len; i++) {

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Sun Jan 23 05:29:40 2011
@@ -125,8 +125,7 @@ public class Follower extends Learner{
             fzk.commit(qp.getZxid());
             break;
         case Leader.UPTODATE:
-            fzk.takeSnapshot();
-            self.cnxnFactory.setZooKeeperServer(fzk);
+            LOG.error("Received an UPTODATE message after Follower started");
             break;
         case Leader.REVALIDATE:
             revalidate(qp);

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Sun Jan 23 05:29:40 2011
@@ -28,6 +28,7 @@ import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -35,11 +36,14 @@ import org.apache.jute.BinaryInputArchiv
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This class is the superclass of two of the three main actors in a ZK
@@ -47,6 +51,10 @@ import org.apache.zookeeper.server.quoru
  * a good deal of code which is moved into Peer to avoid duplication. 
  */
 public class Learner {       
+    static class PacketInFlight {
+        TxnHeader hdr;
+        Record rec;
+    }
     QuorumPeer self;
     LearnerZooKeeperServer zk;
     
@@ -276,7 +284,8 @@ public class Learner {       
         QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
         QuorumPacket qp = new QuorumPacket();
         
-        readPacket(qp);        
+        readPacket(qp);   
+        LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
                 LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));                
@@ -290,7 +299,7 @@ public class Learner {       
                 String signature = leaderIs.readString("signature");
                 if (!signature.equals("BenWasHere")) {
                     LOG.error("Missing signature. Got " + signature);
-                    throw new IOException("Missing signature");
+                    throw new IOException("Missing signature");                   
                 }
             } else if (qp.getType() == Leader.TRUNC) {
                 //we need to truncate the log to the lastzxid of the leader
@@ -311,15 +320,63 @@ public class Learner {       
                 System.exit(13);
 
             }
+            zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
             if(LOG.isInfoEnabled()){
                 LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >> 32L));
             }
-            zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid);
+                        
+            long lastQueued = 0;
+            // we are now going to start getting transactions to apply followed by an UPTODATE
+            outerLoop:
+            while (self.isRunning()) {
+                readPacket(qp);
+                switch(qp.getType()) {
+                case Leader.PROPOSAL:
+                    PacketInFlight pif = new PacketInFlight();
+                    pif.hdr = new TxnHeader();
+                    BinaryInputArchive ia = BinaryInputArchive
+                            .getArchive(new ByteArrayInputStream(qp.getData()));
+                    pif.rec     = SerializeUtils.deserializeTxn(ia, pif.hdr);
+                    if (pif.hdr.    getZxid() != lastQueued + 1) {
+                    LOG.warn("Got zxid 0x"
+                            + Long.toHexString(pif.hdr.getZxid())
+                            + " expected 0x"
+                            + Long.toHexString(lastQueued + 1));
+                    }
+                    lastQueued = pif.hdr.getZxid();
+                    packetsNotCommitted.add(pif);
+                    break;
+                case Leader.COMMIT:
+                    pif = packetsNotCommitted.peekFirst();
+                    if (pif.hdr.getZxid() != qp.getZxid()) {
+                        LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
+                    } else {
+                        zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+                        packetsNotCommitted.remove();
+                    }
+                    break;
+                case Leader.INFORM:
+                    TxnHeader hdr = new TxnHeader();
+                    ia = BinaryInputArchive
+                            .getArchive(new ByteArrayInputStream(qp.getData()));
+                    Record txn = SerializeUtils.deserializeTxn(ia, hdr);
+                    zk.getZKDatabase().processTxn(hdr, txn);
+                    break;
+                case Leader.UPTODATE:
+                    zk.takeSnapshot();
+                    self.cnxnFactory.setZooKeeperServer(zk);                
+                    break outerLoop;
+                }
+            }
         }
         ack.setZxid(newLeaderZxid & ~0xffffffffL);
         writePacket(ack, true);
         sock.setSoTimeout(self.tickTime * self.syncLimit);
         zk.startup();
+        //We have to have a commit processor to do this
+        for(PacketInFlight p: packetsNotCommitted) {
+            ((FollowerZooKeeperServer)zk).logRequest(p.hdr, p.rec);
+        }
     }
     
     protected void revalidate(QuorumPacket qp) throws IOException {

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sun Jan 23 05:29:40 2011
@@ -29,6 +29,8 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
@@ -254,79 +256,72 @@ public class LearnerHandler extends Thre
             long peerLastZxid = qp.getZxid();
             /* the default to send to the follower */
             int packetToSend = Leader.SNAP;
-            boolean logTxns = true;
             long zxidToSend = 0;
-            
+            long leaderLastZxid = 0;
             /** the packets that the follower needs to get updates from **/
             long updates = peerLastZxid;
             
             /* we are sending the diff check if we have proposals in memory to be able to 
              * send a diff to the 
              */ 
-            LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
-            synchronized(proposals) {
+            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
+            ReadLock rl = lock.readLock();
+            try {
+                rl.lock();        
+                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
+                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
+                LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
                 if (proposals.size() != 0) {
-                    if ((leader.zk.getZKDatabase().getmaxCommittedLog() >= peerLastZxid)
-                            && (leader.zk.getZKDatabase().getminCommittedLog() <= peerLastZxid)) {
+                    if ((maxCommittedLog >= peerLastZxid)
+                            && (minCommittedLog <= peerLastZxid)) {
                         packetToSend = Leader.DIFF;
-                        zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
+                        zxidToSend = maxCommittedLog;
                         for (Proposal propose: proposals) {
                             if (propose.packet.getZxid() > peerLastZxid) {
                                 queuePacket(propose.packet);
                                 QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                         null, null);
                                 queuePacket(qcommit);
-
                             }
                         }
+                    } else if (peerLastZxid > maxCommittedLog) {
+                        packetToSend = Leader.TRUNC;
+                        zxidToSend = maxCommittedLog;
+                        updates = zxidToSend;
                     }
+                } else {
+                    // just let the state transfer happen
+                }               
+                
+                leaderLastZxid = leader.startForwarding(this, updates);
+                if (peerLastZxid == leaderLastZxid) {
+                    // We are in sync so we'll do an empty diff
+                    packetToSend = Leader.DIFF;
+                    zxidToSend = leaderLastZxid;
                 }
-                else {
-                    logTxns = false;
-                }            
-            }
-            
-            //check if we decided to send a diff or we need to send a truncate
-            // we avoid using epochs for truncating because epochs make things
-            // complicated. Two epochs might have the last 32 bits as same.
-            // only if we know that there is a committed zxid in the queue that
-            // is less than the one the peer has we send a trunc else to make
-            // things simple we just send sanpshot.
-            if (logTxns && (peerLastZxid > leader.zk.getZKDatabase().getmaxCommittedLog())) {
-                // this is the only case that we are sure that
-                // we can ask the peer to truncate the log
-                packetToSend = Leader.TRUNC;
-                zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
-                updates = zxidToSend;
-            }
-            
-            /* see what other packets from the proposal
-             * and tobeapplied queues need to be sent
-             * and then decide if we can just send a DIFF
-             * or we actually need to send the whole snapshot
-             */
-            long leaderLastZxid = leader.startForwarding(this, updates);
-            // a special case when both the ids are the same 
-            if (peerLastZxid == leaderLastZxid) {
-                packetToSend = Leader.DIFF;
-                zxidToSend = leaderLastZxid;
+            } finally {
+                rl.unlock();
             }
 
             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                     leaderLastZxid, null, null);
             oa.writeRecord(newLeaderQP, "packet");
             bufferedOutput.flush();
-            
-           
+            //Need to set the zxidToSend to the latest zxid
+            if (packetToSend == Leader.SNAP) {
+                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+            }
             oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
             bufferedOutput.flush();
             
             /* if we are not truncating or sending a diff just send a snapshot */
             if (packetToSend == Leader.SNAP) {
-                LOG.warn("Sending snapshot last zxid of peer is 0x"
+                LOG.info("Sending snapshot last zxid of peer is 0x"
                         + Long.toHexString(peerLastZxid) + " " 
                         + " zxid of leader is 0x"
-                        + Long.toHexString(leaderLastZxid));
+                        + Long.toHexString(leaderLastZxid)
+                        + "sent zxid of db as 0x" 
+                        + Long.toHexString(zxidToSend));
                 // Dump data to peer
                 leader.zk.getZKDatabase().serializeSnapshot(oa);
                 oa.writeString("BenWasHere", "signature");
@@ -524,6 +519,6 @@ public class LearnerHandler extends Thre
 
     public boolean synced() {
         return isAlive()
-                && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
+        && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
     }
 }

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java Sun Jan 23 05:29:40 2011
@@ -111,8 +111,7 @@ public class Observer extends Learner{  
             LOG.warn("Ignoring commit");            
             break;            
         case Leader.UPTODATE:
-            zk.takeSnapshot();
-            self.cnxnFactory.setZooKeeperServer(zk);
+            LOG.error("Received an UPTODATE message after Observer started");
             break;
         case Leader.REVALIDATE:
             revalidate(qp);

Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1062327&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Sun Jan 23 05:29:40 2011
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.Test;
+
+
+public class FollowerResyncConcurrencyTest extends QuorumBase {
+    volatile int counter = 0;
+    volatile int errors = 0; 
+
+    private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+
+    /**
+     * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
+     * setting the ZXID of the SNAP packet
+     * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down
+     * The non-leader ZKs are writing to cluster
+     * Shut down F1 again
+     * Restart after sessions are expired, expect to get a snap file
+     * Shut down, run some transactions through.
+     * Restart to a diff while transactions are running in leader
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    @Test
+    public void testResyncBySnapThenDiffAfterFollowerCrashes () 
+    throws IOException, InterruptedException, KeeperException,  Throwable{
+        final Semaphore sem = new Semaphore(0);
+
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+        CountdownWatcher watcher1 = new CountdownWatcher();
+        CountdownWatcher watcher2 = new CountdownWatcher();
+        CountdownWatcher watcher3 = new CountdownWatcher();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null)
+            index++;
+
+        Leader leader = qu.getPeer(index).peer.leader;
+
+        assertNotNull(leader);    
+        /*
+         * Reusing the index variable to select a follower to connect to
+         */
+        index = (index == 1) ? 2 : 1;
+        qu.shutdown(index);
+        final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000,watcher3);
+        watcher3.waitForConnected(CONNECTION_TIMEOUT);
+        zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+        qu.restart(index);
+        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
+
+        ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher2);
+    
+        watcher1.waitForConnected(CONNECTION_TIMEOUT);
+        watcher2.waitForConnected(CONNECTION_TIMEOUT);
+        
+        zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);      
+        Thread t = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                for(int i = 0; i < 1000; i++) {
+                    zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                        @Override
+                        public void processResult(int rc, String path, Object ctx, String name) {
+                            counter++;
+                            if (rc != 0) {
+                                errors++;
+                            }
+                            if(counter == 14200){
+                                sem.release();
+                            }
+
+
+                        }
+                    }, null);
+                    if(i%10==0){
+                        try {
+                            Thread.sleep(100);
+                        } catch (Exception e) {
+
+                        }
+                    }
+                }
+
+            }
+        });
+
+        
+        for(int i = 0; i < 13000; i++) {
+            zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, String name) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                    if(counter == 14200){
+                        sem.release();
+                    }
+
+
+                }
+            }, null);            
+
+            if(i == 5000){
+                qu.shutdown(index);               
+                LOG.info("Shutting down s1");
+            }
+            if(i == 12000){
+                //Restart off of snap, then get some txns for a log, then shut down
+                qu.restart(index);       
+                Thread.sleep(300);
+                qu.shutdown(index);
+                t.start();
+                Thread.sleep(300);                
+                qu.restart(index);
+                LOG.info("Setting up server: " + index);
+            }
+            if((i % 1000) == 0){
+                Thread.sleep(1000);
+            }
+
+            if(i%50 == 0) {
+                zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name) {
+                        counter++;
+                        if (rc != 0) {
+                            errors++;
+                        }
+                        if(counter == 14200){
+                            sem.release();
+                        }
+
+
+                    }
+                }, null);
+            }
+        }
+
+        // Wait until all updates return
+        if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) {
+            LOG.warn("Did not aquire semaphore fast enough");
+        }
+        t.join(10000);
+        Thread.sleep(1000);
+        
+            verifyState(qu, index, leader);
+        
+    }      
+    
+    /**
+     * This test:
+     * Starts up 3 ZKs. The non-leader ZKs are writing to cluster
+     * Shut down one of the non-leader ZKs. 
+     * Restart after sessions have expired but <500 txns have taken place (get a diff)
+     * Shut down immediately after restarting, start running separate thread with other transactions
+     * Restart to a diff while transactions are running in leader
+     * 
+     * 
+     * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
+     * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
+     * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
+     * would be missed
+     * 
+     * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
+     * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
+     * during the leader's diff forwarding.
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     * @throws Throwable
+     */
+
+    @Test
+    public void testResyncByDiffAfterFollowerCrashes () 
+    throws IOException, InterruptedException, KeeperException, Throwable{
+        final Semaphore sem = new Semaphore(0);
+
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+        CountdownWatcher watcher1 = new CountdownWatcher();
+        CountdownWatcher watcher2 = new CountdownWatcher();
+        CountdownWatcher watcher3 = new CountdownWatcher();
+
+
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null)
+            index++;
+
+        Leader leader = qu.getPeer(index).peer.leader;
+
+        assertNotNull(leader);
+
+        /*
+         * Reusing the index variable to select a follower to connect to
+         */
+        index = (index == 1) ? 2 : 1;
+
+        ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
+
+        ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000,watcher2);
+        final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000, watcher3);
+        watcher1.waitForConnected(CONNECTION_TIMEOUT);
+        watcher2.waitForConnected(CONNECTION_TIMEOUT);
+        watcher3.waitForConnected(CONNECTION_TIMEOUT);
+        zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);      
+        zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+        
+        final AtomicBoolean runNow = new AtomicBoolean(false);
+        Thread t = new Thread(new Runnable() {
+
+            @Override
+            public void run() {                                
+                int inSyncCounter = 0;
+                while(inSyncCounter < 400) {    
+                    if(runNow.get()) {
+                        zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                            @Override
+                            public void processResult(int rc, String path, Object ctx, String name) {
+                                counter++;
+                                if (rc != 0) {
+                                    errors++;
+                                }
+                                if(counter > 7300){
+                                    sem.release();
+                                }
+
+
+                            }
+                        }, null);
+                        
+                        try {
+                            Thread.sleep(10);
+                        } catch (Exception e) {
+                        }
+                        inSyncCounter++;
+                    }
+                    else {
+                        Thread.yield();
+                    }
+                }
+
+            }
+        });
+
+        t.start();
+        for(int i = 0; i < 5000; i++) {
+            zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, String name) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                    if(counter > 7300){
+                        sem.release();
+                    }
+
+
+                }
+            }, null);            
+
+            if(i == 1000){
+                qu.shutdown(index);      
+                Thread.sleep(1100);
+                LOG.info("Shutting down s1");
+
+            }
+            if(i == 1100 || i == 1150 || i == 1200) {
+                Thread.sleep(1000);
+            }
+            
+            if(i == 1200){
+                qu.startThenShutdown(index);                                
+                runNow.set(true);
+                qu.restart(index);
+                LOG.info("Setting up server: " + index);
+            }
+        
+
+            if(i>=1000 &&  i%2== 0) {
+                zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name) {
+                        counter++;
+                        if (rc != 0) {
+                            errors++;
+                        }
+                        if(counter > 7300){
+                            sem.release();
+                        }
+
+
+                    }
+                }, null);
+            }
+            if(i == 1050 || i == 1100 || i == 1150) {
+                Thread.sleep(1000);
+            }
+        }
+
+        // Wait until all updates return
+        if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) {
+            LOG.warn("Did not aquire semaphore fast enough");
+        }
+        t.join(10000);
+        Thread.sleep(1000);
+        // Verify that server is following and has the same epoch as the leader
+        
+        verifyState(qu, index, leader);
+        
+    }
+
+    private void verifyState(QuorumUtil qu, int index, Leader leader) {
+        assertTrue("Not following", qu.getPeer(index).peer.follower != null);
+        long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
+        long epochL = (leader.getEpoch() >> 32L);
+        assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + 
+                "Current epoch: " + epochF, epochF == epochL);
+        int leaderIndex = (index == 1) ? 2 : 1;    
+        Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
+        Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
+        
+        for(Long l : sessionsRestarted) {
+            assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));        
+        }      
+        assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size());
+        ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
+        ZKDatabase clean =  qu.getPeer(3).peer.getActiveServer().getZKDatabase();
+        ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
+        for(Long l : sessionsRestarted) {
+            assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
+            HashSet ephemerals = restarted.getEphemerals(l);
+            HashSet cleanEphemerals = clean.getEphemerals(l);
+            for(Object o : cleanEphemerals) {
+                if(!ephemerals.contains(o)) {
+                    LOG.info("Restarted follower doesn't contain ephemeral " + o);
+                }
+            }
+            HashSet leadEphemerals = lead.getEphemerals(l);
+            for(Object o : leadEphemerals) {
+                if(!cleanEphemerals.contains(o)) {
+                    LOG.info("Follower doesn't contain ephemeral from leader " + o);
+                }
+            }
+            assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());            
+            assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
+        }
+    }      
+}

Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Sun Jan 23 05:29:40 2011
@@ -61,9 +61,9 @@ public class QuorumUtil {
 
     private final Map<Integer, PeerStruct> peers = new HashMap<Integer, PeerStruct>();
 
-    private final int N;
+    public final int N;
 
-    private final int ALL;
+    public final int ALL;
 
     private String hostPort;
 
@@ -123,6 +123,7 @@ public class QuorumUtil {
     }
 
     public void startAll() throws IOException {
+        shutdownAll();
         for (int i = 1; i <= ALL; ++i) {
             start(i);
             LOG.info("Started QuorumPeer " + i);
@@ -182,7 +183,26 @@ public class QuorumUtil {
                 ps.id, tickTime, initLimit, syncLimit);
         Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
 
+        ps.peer.start();    
+    }
+    
+    public void restart(int id) throws IOException {
+        start(id);
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT));
+    }
+    
+    public void startThenShutdown(int id) throws IOException {
+        PeerStruct ps = getPeer(id);
+        LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
+        ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
+                ps.id, tickTime, initLimit, syncLimit);
+        Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+
         ps.peer.start();
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        shutdown(id);
     }
 
     public void shutdownAll() {