You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2011/09/14 08:56:14 UTC

svn commit: r1170452 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/

Author: mahadev
Date: Wed Sep 14 06:56:13 2011
New Revision: 1170452

URL: http://svn.apache.org/viewvc?rev=1170452&view=rev
Log:
ZOOKEEPER-1136. NEW_LEADER should be queued not sent to match the Zab 1.0 protocol on the twiki (breed via mahadev)

Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Sep 14 06:56:13 2011
@@ -322,6 +322,9 @@ BUGFIXES: 
   ZOOKEEPER-961. Watch recovery after disconnection when connection string contains a prefix.
   (Matthias Spycher via mahadev)
 
+  ZOOKEEPER-1136. NEW_LEADER should be queued not sent to match the Zab 1.0 protocol 
+  on the twiki (breed via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java Wed Sep 14 06:56:13 2011
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.Record;
+
 public class ByteBufferInputStream extends InputStream {
     ByteBuffer bb;
 
@@ -69,4 +72,11 @@ public class ByteBufferInputStream exten
         return n;
     }
 
+    static public void byteBuffer2Record(ByteBuffer bb, Record record)
+            throws IOException {
+        BinaryInputArchive ia;
+        ia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+        record.deserialize(ia, "request");
+    }
+
 }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java?rev=1170452&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java Wed Sep 14 06:56:13 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+
+public class ByteBufferOutputStream extends OutputStream {
+    ByteBuffer bb;
+    public ByteBufferOutputStream(ByteBuffer bb) {
+        this.bb = bb;
+    }
+    @Override
+    public void write(int b) throws IOException {
+        bb.put((byte)b);
+    }
+    @Override
+    public void write(byte[] b) throws IOException {
+        bb.put(b);
+    }
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        bb.put(b, off, len);
+    }
+    static public void record2ByteBuffer(Record record, ByteBuffer bb)
+    throws IOException {
+        BinaryOutputArchive oa;
+        oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb));
+        record.serialize(oa, "request");
+    }
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Wed Sep 14 06:56:13 2011
@@ -880,7 +880,7 @@ public class DataTree {
                         }
                         assert(record != null);
 
-                        ZooKeeperServer.byteBuffer2Record(bb, record);
+                        ByteBufferInputStream.byteBuffer2Record(bb, record);
                        
                         if (failed && subtxn.getType() != OpCode.error){
                             int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Wed Sep 14 06:56:13 2011
@@ -255,7 +255,7 @@ public class FinalRequestProcessor imple
             case OpCode.sync: {
                 lastOp = "SYNC";
                 SyncRequest syncRequest = new SyncRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
+                ByteBufferInputStream.byteBuffer2Record(request.request,
                         syncRequest);
                 rsp = new SyncResponse(syncRequest.getPath());
                 break;
@@ -270,7 +270,7 @@ public class FinalRequestProcessor imple
                 lastOp = "EXIS";
                 // TODO we need to figure out the security requirement for this!
                 ExistsRequest existsRequest = new ExistsRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
+                ByteBufferInputStream.byteBuffer2Record(request.request,
                         existsRequest);
                 String path = existsRequest.getPath();
                 if (path.indexOf('\0') != -1) {
@@ -284,7 +284,7 @@ public class FinalRequestProcessor imple
             case OpCode.getData: {
                 lastOp = "GETD";
                 GetDataRequest getDataRequest = new GetDataRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
+                ByteBufferInputStream.byteBuffer2Record(request.request,
                         getDataRequest);
                 DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                 if (n == null) {
@@ -308,7 +308,7 @@ public class FinalRequestProcessor imple
                 SetWatches setWatches = new SetWatches();
                 // XXX We really should NOT need this!!!!
                 request.request.rewind();
-                ZooKeeperServer.byteBuffer2Record(request.request, setWatches);
+                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                 long relativeZxid = setWatches.getRelativeZxid();
                 zks.getZKDatabase().setWatches(relativeZxid, 
                         setWatches.getDataWatches(), 
@@ -319,7 +319,7 @@ public class FinalRequestProcessor imple
             case OpCode.getACL: {
                 lastOp = "GETA";
                 GetACLRequest getACLRequest = new GetACLRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
+                ByteBufferInputStream.byteBuffer2Record(request.request,
                         getACLRequest);
                 Stat stat = new Stat();
                 List<ACL> acl = 
@@ -330,7 +330,7 @@ public class FinalRequestProcessor imple
             case OpCode.getChildren: {
                 lastOp = "GETC";
                 GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
+                ByteBufferInputStream.byteBuffer2Record(request.request,
                         getChildrenRequest);
                 DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
                 if (n == null) {
@@ -353,7 +353,7 @@ public class FinalRequestProcessor imple
             case OpCode.getChildren2: {
                 lastOp = "GETC";
                 GetChildren2Request getChildren2Request = new GetChildren2Request();
-                ZooKeeperServer.byteBuffer2Record(request.request,
+                ByteBufferInputStream.byteBuffer2Record(request.request,
                         getChildren2Request);
                 Stat stat = new Stat();
                 DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Wed Sep 14 06:56:13 2011
@@ -488,32 +488,32 @@ public class PrepRequestProcessor extend
             switch (request.type) {
                 case OpCode.create:
                 CreateRequest createRequest = new CreateRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request, createRequest);
+                ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                 pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest);
                 break;
             case OpCode.delete:
                 DeleteRequest deleteRequest = new DeleteRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request, deleteRequest);
+                ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
                 pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
                 break;
             case OpCode.setData:
                 SetDataRequest setDataRequest = new SetDataRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request, setDataRequest);
+                ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
                 pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
                 break;
             case OpCode.setACL:
                 SetACLRequest setAclRequest = new SetACLRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request, setAclRequest);
+                ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
                 pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
                 break;
             case OpCode.check:
                 CheckVersionRequest checkRequest = new CheckVersionRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request, checkRequest);
+                ByteBufferInputStream.byteBuffer2Record(request.request, checkRequest);
                 pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
                 break;
             case OpCode.multi:
                 MultiTransactionRecord multiRequest = new MultiTransactionRecord();
-                ZooKeeperServer.byteBuffer2Record(request.request, multiRequest);
+                ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                 List<Txn> txns = new ArrayList<Txn>();
 
                 //Each op in a multi-op must have the same zxid!

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Sep 14 06:56:13 2011
@@ -661,13 +661,6 @@ public class ZooKeeperServer implements 
         }
     }
 
-    static public void byteBuffer2Record(ByteBuffer bb, Record record)
-            throws IOException {
-        BinaryInputArchive ia;
-        ia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
-        record.deserialize(ia, "request");
-    }
-
     public static int getSnapCount() {
         String sc = System.getProperty("zookeeper.snapCount");
         try {
@@ -860,7 +853,7 @@ public class ZooKeeperServer implements 
         if (h.getType() == OpCode.auth) {
             LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
             AuthPacket authPacket = new AuthPacket();
-            ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
+            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
             String scheme = authPacket.getScheme();
             AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
             Code authReturn = KeeperException.Code.AUTHFAILED;
@@ -917,7 +910,7 @@ public class ZooKeeperServer implements 
     private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException {
         LOG.debug("Responding to client SASL token.");
         GetSASLRequest clientTokenRecord = new GetSASLRequest();
-        byteBuffer2Record(incomingBuffer,clientTokenRecord);
+        ByteBufferInputStream.byteBuffer2Record(incomingBuffer,clientTokenRecord);
         byte[] clientToken = clientTokenRecord.getToken();
         LOG.debug("Size of client SASL token: " + clientToken.length);
         byte[] responseToken = null;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Wed Sep 14 06:56:13 2011
@@ -281,7 +281,7 @@ public class Leader {
     
     long epoch = -1;
     boolean waitingForNewEpoch = true;
-    boolean readyToStart = false;
+    volatile boolean readyToStart = false;
     
     /**
      * This method is main function that is called to lead
@@ -309,13 +309,17 @@ public class Leader {
             cnxAcceptor = new LearnerCnxAcceptor();
             cnxAcceptor.start();
             
+            readyToStart = true;
             long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
             self.setAcceptedEpoch(epoch);
+            
             zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
             
+            /*
             synchronized(this){
                 lastProposed = zk.getZxid();
             }
+            */
             
             newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                     null, null);
@@ -328,7 +332,6 @@ public class Leader {
             outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
             newLeaderProposal.ackSet.add(self.getId());
             
-            readyToStart = true;
             waitForEpochAck(self.getId(), leaderStateSummary);
             self.setCurrentEpoch(epoch);
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Wed Sep 14 06:56:13 2011
@@ -290,18 +290,17 @@ public class Learner {       
         	}
         	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
         	writePacket(ackNewEpoch, true);
-        	readPacket(qp);
+            return ZxidUtils.makeZxid(newEpoch, 0);
         } else {
         	if (newEpoch > self.getAcceptedEpoch()) {
         		self.setAcceptedEpoch(newEpoch);
         	}
+            if (qp.getType() != Leader.NEWLEADER) {
+                LOG.error("First packet should have been NEWLEADER");
+                throw new IOException("First packet should have been NEWLEADER");
+            }
+            return qp.getZxid();
         }
-        if (qp.getType() != Leader.NEWLEADER) {
-            LOG.error("First packet should have been NEWLEADER");
-            throw new IOException("First packet should have been NEWLEADER");
-        }
-        
-        return qp.getZxid();
     } 
     
     /**
@@ -353,6 +352,11 @@ public class Learner {       
             zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                         
             long lastQueued = 0;
+
+            // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
+            // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
+            // we need to make sure that we don't take the snapshot twice.
+            boolean snapshotTaken = false;
             // we are now going to start getting transactions to apply followed by an UPTODATE
             outerLoop:
             while (self.isRunning()) {
@@ -362,7 +366,7 @@ public class Learner {       
                     PacketInFlight pif = new PacketInFlight();
                     pif.hdr = new TxnHeader();
                     pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
-                    if (pif.hdr.    getZxid() != lastQueued + 1) {
+                    if (pif.hdr.getZxid() != lastQueued + 1) {
                     LOG.warn("Got zxid 0x"
                             + Long.toHexString(pif.hdr.getZxid())
                             + " expected 0x"
@@ -386,9 +390,16 @@ public class Learner {       
                     zk.getZKDatabase().processTxn(hdr, txn);
                     break;
                 case Leader.UPTODATE:
-                    zk.takeSnapshot();
+                    if (!snapshotTaken) {
+                        zk.takeSnapshot();
+                    }
                     self.cnxnFactory.setZooKeeperServer(zk);                
                     break outerLoop;
+                case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
+                    zk.takeSnapshot();
+                    snapshotTaken = true;
+                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
+                    break;
                 }
             }
         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Wed Sep 14 06:56:13 2011
@@ -39,8 +39,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
@@ -233,6 +233,7 @@ public class LearnerHandler extends Thre
     @Override
     public void run() {
         try {            
+            sock.setSoTimeout(leader.self.getTickTime()*leader.self.getInitLimit());
             ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
                     .getInputStream()));
             bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
@@ -252,7 +253,7 @@ public class LearnerHandler extends Thre
             		this.sid = bbsid.getLong();
             	} else {
             		LearnerInfo li = new LearnerInfo();
-            		ZooKeeperServer.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
+            		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
             		this.sid = li.getServerid();
             		this.version = li.getProtocolVersion();
             	}
@@ -271,37 +272,33 @@ public class LearnerHandler extends Thre
             
             long peerLastZxid;
             StateSummary ss = null;
-            if (learnerType == LearnerType.PARTICIPANT) {
-            	long zxid = qp.getZxid();
-				long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
-				
-				if (this.getVersion() < 0x10000) {
-					// we are going to have to extrapolate the epoch information
-					long epoch = ZxidUtils.getEpochFromZxid(zxid);
-					ss = new StateSummary(epoch, zxid);
-					// fake the message
-					leader.waitForEpochAck(this.getSid(), ss);
-				} else {
-					byte ver[] = new byte[4];
-					ByteBuffer.wrap(ver).putInt(0x10000);
-				    QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
-				    oa.writeRecord(newEpochPacket, "packet");
-		            bufferedOutput.flush();
-		            QuorumPacket ackEpochPacket = new QuorumPacket();
-		            ia.readRecord(ackEpochPacket, "packet");
-		            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
-		             	LOG.error(ackEpochPacket.toString()
-		                        + " is not ACKEPOCH");
-		                return;
-		            }
-            		ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
-		            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
-		            leader.waitForEpochAck(this.getSid(), ss);
-				}
-            	peerLastZxid = ss.getLastZxid();
+            long zxid = qp.getZxid();
+            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
+            
+            if (this.getVersion() < 0x10000) {
+                // we are going to have to extrapolate the epoch information
+                long epoch = ZxidUtils.getEpochFromZxid(zxid);
+                ss = new StateSummary(epoch, zxid);
+                // fake the message
+                leader.waitForEpochAck(this.getSid(), ss);
             } else {
-            	peerLastZxid = qp.getZxid();
+                byte ver[] = new byte[4];
+                ByteBuffer.wrap(ver).putInt(0x10000);
+                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
+                oa.writeRecord(newEpochPacket, "packet");
+                bufferedOutput.flush();
+                QuorumPacket ackEpochPacket = new QuorumPacket();
+                ia.readRecord(ackEpochPacket, "packet");
+                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
+                    LOG.error(ackEpochPacket.toString()
+                            + " is not ACKEPOCH");
+                    return;
+				}
+                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
+                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
+                leader.waitForEpochAck(this.getSid(), ss);
             }
+            peerLastZxid = ss.getLastZxid();
             
             /* the default to send to the follower */
             int packetToSend = Leader.SNAP;
@@ -390,9 +387,13 @@ public class LearnerHandler extends Thre
                 rl.unlock();
             }
 
-            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                    leaderLastZxid, null, null);
-            oa.writeRecord(newLeaderQP, "packet");
+             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                    ZxidUtils.makeZxid(newEpoch, 0), null, null);
+             if (getVersion() < 0x10000) {
+                oa.writeRecord(newLeaderQP, "packet");
+            } else {
+                queuedPackets.add(newLeaderQP);
+            }
             bufferedOutput.flush();
             //Need to set the zxidToSend to the latest zxid
             if (packetToSend == Leader.SNAP) {
@@ -415,13 +416,6 @@ public class LearnerHandler extends Thre
             }
             bufferedOutput.flush();
             
-            // Mutation packets will be queued during the serialize,
-            // so we need to mark when the peer can actually start
-            // using the data
-            //
-            queuedPackets
-                    .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
-
             // Start sending packets
             new Thread() {
                 public void run() {
@@ -456,6 +450,12 @@ public class LearnerHandler extends Thre
                     leader.zk.wait(20);
                 }
             }
+            // Mutation packets will be queued during the serialize,
+            // so we need to mark when the peer can actually start
+            // using the data
+            //
+            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
+
             
             while (true) {
                 qp = new QuorumPacket();

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1170452&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Wed Sep 14 06:56:13 2011
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.server.ByteBufferOutputStream;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class Zab1_0Test {
+    private static final class LeadThread extends Thread {
+        private final Leader leader;
+
+        private LeadThread(Leader leader) {
+            this.leader = leader;
+        }
+
+        public void run() {
+            try {
+                leader.lead();
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                leader.shutdown("lead ended");
+            }
+        }
+    }
+    private static final class NullServerCnxnFactory extends ServerCnxnFactory {
+        public void startup(ZooKeeperServer zkServer) throws IOException,
+                InterruptedException {
+        }
+        public void start() {
+        }
+        public void shutdown() {
+        }
+        public void setMaxClientCnxnsPerHost(int max) {
+        }
+        public void join() throws InterruptedException {
+        }
+        public int getMaxClientCnxnsPerHost() {
+            return 0;
+        }
+        public int getLocalPort() {
+            return 0;
+        }
+        public InetSocketAddress getLocalAddress() {
+            return null;
+        }
+        public Iterable<ServerCnxn> getConnections() {
+            return null;
+        }
+        public void configure(InetSocketAddress addr, int maxClientCnxns)
+                throws IOException {
+        }
+        public void closeSession(long sessionId) {
+        }
+        public void closeAll() {
+        }
+    }
+    static class MockDataTreeBuilder implements DataTreeBuilder {
+        @Override
+        public DataTree build() {
+            return new DataTree();
+        }
+        
+    }
+    static Socket[] getSocketPair() throws IOException {
+        ServerSocket ss = new ServerSocket();
+        ss.bind(null);
+        InetSocketAddress endPoint = (InetSocketAddress) ss.getLocalSocketAddress();
+        Socket s = new Socket(endPoint.getAddress(), endPoint.getPort());
+        return new Socket[] { s, ss.accept() };
+    }
+    static void readPacketSkippingPing(InputArchive ia, QuorumPacket qp) throws IOException {
+        while(true) {
+            ia.readRecord(qp, null);
+            if (qp.getType() != Leader.PING) {
+                return;
+            }
+        }
+    }
+    
+    static public interface LeaderConversation {
+        void converseWithLeader(InputArchive ia, OutputArchive oa) throws Exception;
+    }
+    
+    static public interface FollowerConversation {
+        void converseWithFollower(InputArchive ia, OutputArchive oa) throws Exception;
+    }
+    
+    public void testConversation(LeaderConversation conversation) throws Exception {
+        Socket pair[] = getSocketPair();
+        Socket leaderSocket = pair[0];
+        Socket followerSocket = pair[1];
+        File tmpDir = File.createTempFile("test", "dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        LeadThread leadThread = null;
+        Leader leader = null;
+        try {
+            QuorumPeer peer = createQuorumPeer(tmpDir);
+            leader = createLeader(tmpDir, peer);
+            peer.leader = leader;
+            leadThread = new LeadThread(leader);
+            leadThread.start();
+
+            while(!leader.readyToStart) {
+                Thread.sleep(20);
+            }
+            
+            LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+            lh.start();
+            leaderSocket.setSoTimeout(4000);
+
+            InputArchive ia = BinaryInputArchive.getArchive(followerSocket
+                    .getInputStream());
+            OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket
+                    .getOutputStream());
+
+            conversation.converseWithLeader(ia, oa);
+        } finally {
+            recursiveDelete(tmpDir);
+            if (leader != null) {
+                leader.shutdown("end of test");
+            }
+            if (leadThread != null) {
+                leadThread.interrupt();
+                leadThread.join();
+            }
+        }
+    }
+        
+    @Test
+    public void testNormalRun() throws Exception {
+        testConversation(new LeaderConversation() {
+            public void converseWithLeader(InputArchive ia, OutputArchive oa)
+                    throws IOException {
+                /* we test a normal run. everything should work out well. */
+                LearnerInfo li = new LearnerInfo(1, 0x10000);
+                byte liBytes[] = new byte[12];
+                ByteBufferOutputStream.record2ByteBuffer(li,
+                        ByteBuffer.wrap(liBytes));
+                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
+                        liBytes, null);
+                oa.writeRecord(qp, null);
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+                        0x10000);
+                qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
+                oa.writeRecord(qp, null);
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.DIFF, qp.getType());
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
+                oa.writeRecord(qp, null);
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.UPTODATE, qp.getType());
+            }
+        });
+    }
+    
+    @Test
+    public void testLeaderBehind() throws Exception {
+        testConversation(new LeaderConversation() {
+            public void converseWithLeader(InputArchive ia, OutputArchive oa)
+                    throws IOException {
+                /* we test a normal run. everything should work out well. */
+                LearnerInfo li = new LearnerInfo(1, 0x10000);
+                byte liBytes[] = new byte[12];
+                ByteBufferOutputStream.record2ByteBuffer(li,
+                        ByteBuffer.wrap(liBytes));
+                /* we are going to say we last acked epoch 20 */
+                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0),
+                        liBytes, null);
+                oa.writeRecord(qp, null);
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
+                Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+                        0x10000);
+                qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
+                oa.writeRecord(qp, null);
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.DIFF, qp.getType());
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.NEWLEADER, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
+                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
+                oa.writeRecord(qp, null);
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.UPTODATE, qp.getType());
+            }
+        });
+    }
+
+
+    private void recursiveDelete(File file) {
+        if (file.isFile()) {
+            file.delete();
+        } else {
+            for(File c: file.listFiles()) {
+                recursiveDelete(c);
+            }
+            file.delete();
+        }
+    }
+
+    private Leader createLeader(File tmpDir, QuorumPeer peer)
+            throws IOException, NoSuchFieldException, IllegalAccessException {
+        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+        peer.setTxnFactory(logFactory);
+        Field addrField = peer.getClass().getDeclaredField("myQuorumAddr");
+        addrField.setAccessible(true);
+        addrField.set(peer, new InetSocketAddress(33556));
+        ZKDatabase zkDb = new ZKDatabase(logFactory);
+        DataTreeBuilder treeBuilder = new MockDataTreeBuilder();
+        LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, treeBuilder, zkDb);
+        return new Leader(peer, zk);
+    }
+    private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
+            FileNotFoundException {
+        QuorumPeer peer = new QuorumPeer();
+        peer.syncLimit = 2;
+        peer.initLimit = 2;
+        peer.tickTime = 2000;
+        peer.quorumPeers = new HashMap<Long, QuorumServer>();
+        peer.quorumPeers.put(1L, new QuorumServer(0, new InetSocketAddress(33221)));
+        peer.quorumPeers.put(1L, new QuorumServer(1, new InetSocketAddress(33223)));
+        peer.setQuorumVerifier(new QuorumMaj(3));
+        peer.setCnxnFactory(new NullServerCnxnFactory());
+        File version2 = new File(tmpDir, "version-2");
+        version2.mkdir();
+        new FileOutputStream(new File(version2, "currentEpoch")).write("0\n".getBytes());
+        new FileOutputStream(new File(version2, "acceptedEpoch")).write("0\n".getBytes());
+        return peer;
+    }
+}