You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2011/11/05 21:14:21 UTC

svn commit: r1198043 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: camille
Date: Sat Nov  5 20:14:21 2011
New Revision: 1198043

URL: http://svn.apache.org/viewvc?rev=1198043&view=rev
Log:
ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing intermittently.
ZOOKEEPER-1282. Learner.java not following Zab 1.0 protocol - setCurrentEpoch should be done upon receipt of NEWLEADER (before acking it) and not upon receipt of UPTODATE.
ZOOKEEPER-1291. AcceptedEpoch not updated at leader before it proposes the epoch to followers.

Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Sat Nov  5 20:14:21 2011
@@ -363,6 +363,15 @@ BUGFIXES: 
 
   ZOOKEEPER-1270. testEarlyLeaderAbandonment failing intermittently, 
   quorum formed, no serving. (Flavio, Camille and Alexander Shraer via mahadev)
+  
+  ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing 
+  intermittently. (breed, camille and Alex Shraer via camille)
+  
+  ZOOKEEPER-1282. Learner.java not following Zab 1.0 protocol - 
+  setCurrentEpoch should be done upon receipt of NEWLEADER 
+  (before acking it) and not upon receipt of UPTODATE (breed via camille)
+  
+  ZOOKEEPER-1291. AcceptedEpoch not updated at leader before it proposes the epoch to followers. (Alex Shraer via camille)
 
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Sat Nov  5 20:14:21 2011
@@ -311,7 +311,6 @@ public class Leader {
             
             readyToStart = true;
             long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
-            self.setAcceptedEpoch(epoch);
             
             zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
             
@@ -770,7 +769,7 @@ public class Leader {
     }
 
     private HashSet<Long> connectingFollowers = new HashSet<Long>();
-	public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException {
+	public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
 		synchronized(connectingFollowers) {
 			if (!waitingForNewEpoch) {
 				return epoch;
@@ -782,8 +781,9 @@ public class Leader {
 			QuorumVerifier verifier = self.getQuorumVerifier();
 			if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) 
 {
-				waitingForNewEpoch = false;
-				connectingFollowers.notifyAll();
+			    waitingForNewEpoch = false;
+			    self.setAcceptedEpoch(epoch);
+			    connectingFollowers.notifyAll();
 			} else {
                    long start = System.currentTimeMillis();
                    long cur = start;

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Sat Nov  5 20:14:21 2011
@@ -312,8 +312,10 @@ public class Learner {       
     protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
         QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
         QuorumPacket qp = new QuorumPacket();
+        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
         
         readPacket(qp);   
+        LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
@@ -376,12 +378,16 @@ public class Learner {       
                     packetsNotCommitted.add(pif);
                     break;
                 case Leader.COMMIT:
-                    pif = packetsNotCommitted.peekFirst();
-                    if (pif.hdr.getZxid() != qp.getZxid()) {
-                        LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
+                    if (!snapshotTaken) {
+                        pif = packetsNotCommitted.peekFirst();
+                        if (pif.hdr.getZxid() != qp.getZxid()) {
+                            LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
+                        } else {
+                            zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+                            packetsNotCommitted.remove();
+                        }
                     } else {
-                        zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
-                        packetsNotCommitted.remove();
+                        packetsCommitted.add(qp.getZxid());
                     }
                     break;
                 case Leader.INFORM:
@@ -390,28 +396,34 @@ public class Learner {       
                     zk.getZKDatabase().processTxn(hdr, txn);
                     break;
                 case Leader.UPTODATE:
-                    if (!snapshotTaken) {
+                    if (!snapshotTaken) { // true for the pre v1.0 case
                         zk.takeSnapshot();
+                        self.setCurrentEpoch(newEpoch);
                     }
                     self.cnxnFactory.setZooKeeperServer(zk);                
                     break outerLoop;
                 case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
                     zk.takeSnapshot();
+                    self.setCurrentEpoch(newEpoch);
                     snapshotTaken = true;
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                     break;
                 }
             }
         }
-        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
-        self.setCurrentEpoch(newEpoch);
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
         sock.setSoTimeout(self.tickTime * self.syncLimit);
         zk.startup();
-        //We have to have a commit processor to do this
-        for(PacketInFlight p: packetsNotCommitted) {
-            ((FollowerZooKeeperServer)zk).logRequest(p.hdr, p.rec);
+        // We need to log the stuff that came in between the snapshot and the uptodate
+        if (zk instanceof FollowerZooKeeperServer) {
+            FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
+            for(PacketInFlight p: packetsNotCommitted) {
+                fzk.logRequest(p.hdr, p.rec);
+            }
+            for(Long zxid: packetsCommitted) {
+                fzk.commit(zxid);
+            }
         }
     }
     

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Sat Nov  5 20:14:21 2011
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -26,7 +27,6 @@ import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 
@@ -34,22 +34,24 @@ import org.apache.jute.BinaryInputArchiv
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
-import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ByteBufferOutputStream;
-import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.LearnerInfo;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.Zab1_0Test.LeaderConversation;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -209,13 +211,6 @@ public class Zab1_0Test {
         public void closeAll() {
         }
     }
-    static class MockDataTreeBuilder implements DataTreeBuilder {
-        @Override
-        public DataTree build() {
-            return new DataTree();
-        }
-        
-    }
     static Socket[] getSocketPair() throws IOException {
         ServerSocket ss = new ServerSocket();
         ss.bind(null);
@@ -237,10 +232,10 @@ public class Zab1_0Test {
     }
     
     static public interface FollowerConversation {
-        void converseWithFollower(InputArchive ia, OutputArchive oa) throws Exception;
+        void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
     }
     
-    public void testConversation(LeaderConversation conversation) throws Exception {
+    public void testLeaderConversation(LeaderConversation conversation) throws Exception {
         Socket pair[] = getSocketPair();
         Socket leaderSocket = pair[0];
         Socket followerSocket = pair[1];
@@ -281,12 +276,215 @@ public class Zab1_0Test {
             }
         }
     }
-        
+    
+    public void testFollowerConversation(FollowerConversation conversation) throws Exception {
+        File tmpDir = File.createTempFile("test", "dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        Thread followerThread = null;
+        ConversableFollower follower = null;
+        QuorumPeer peer = null;
+        try {
+            peer = createQuorumPeer(tmpDir);
+            follower = createFollower(tmpDir, peer);
+            peer.follower = follower;
+            
+            ServerSocket ss = new ServerSocket();
+            ss.bind(null);
+            follower.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+            final Follower followerForThread = follower;
+            
+            followerThread = new Thread() {
+                public void run() {
+                    try {
+                        followerForThread.followLeader();
+                    } catch(Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            followerThread.start();
+            Socket leaderSocket = ss.accept();
+            
+            InputArchive ia = BinaryInputArchive.getArchive(leaderSocket
+                    .getInputStream());
+            OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket
+                    .getOutputStream());
+
+            conversation.converseWithFollower(ia, oa, follower);
+        } finally {
+            if (follower != null) {
+                follower.shutdown();
+            }
+            if (followerThread != null) {
+                followerThread.interrupt();
+                followerThread.join();
+            }
+            if (peer != null) {
+                peer.shutdown();
+            }
+            recursiveDelete(tmpDir);
+        }
+    }
+
+    @Test
+    public void testNormalFollowerRun() throws Exception {
+        testFollowerConversation(new FollowerConversation() {
+            @Override
+            public void converseWithFollower(InputArchive ia, OutputArchive oa,
+                    Follower f) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir");
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
+                File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                try {
+                    Assert.assertEquals(0, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                    Stat stat = new Stat();
+                    Assert.assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.FOLLOWERINFO, qp.getType());
+                    Assert.assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
+                    Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    Assert.assertEquals(learnInfo.getServerid(), 0);
+                
+                    // We are simulating an established leader, so the epoch is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte protoBytes[] = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+                
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
+                    Assert.assertEquals(0, qp.getZxid());
+                    Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
+                    Assert.assertEquals(1, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, f.self.getCurrentEpoch());
+                    
+                    // Send the snapshot we created earlier
+                    qp.setType(Leader.SNAP);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    zkDb.serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", null);
+                    qp.setType(Leader.NEWLEADER);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    oa.writeRecord(qp, null);
+
+                    // Get the ack of the new leader
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    Assert.assertEquals(1, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(1, f.self.getCurrentEpoch());
+                    
+                    Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
+                    
+                    // Make sure the data was recorded in the filesystem ok
+                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    long lastZxid = zkDb2.loadDataBase();
+                    Assert.assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
+                    Assert.assertEquals(firstZxid, lastZxid);
+
+                    // Propose an update
+                    long proposalZxid = ZxidUtils.makeZxid(1, 1000);
+                    proposeSetData(qp, proposalZxid, "data2", 2);
+                    oa.writeRecord(qp, null);
+                    
+                    // We want to track the change with a callback rather than depending on timing
+                    class TrackerWatcher implements Watcher {
+                        boolean changed;
+                        synchronized void waitForChange() throws InterruptedException {
+                            while(!changed) {
+                                wait();
+                            }
+                        }
+                        @Override
+                        public void process(WatchedEvent event) {
+                            if (event.getType() == EventType.NodeDataChanged) {
+                                synchronized(this) {
+                                    changed = true;
+                                    notifyAll();
+                                }
+                            }
+                        }
+                        synchronized public boolean changed() {
+                            return changed;
+                        }
+                        
+                    };
+                    TrackerWatcher watcher = new TrackerWatcher();
+                    
+                    // The change should not have happened yet, since we haven't committed
+                    Assert.assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
+                    
+                    // The change should happen now
+                    qp.setType(Leader.COMMIT);
+                    qp.setZxid(proposalZxid);
+                    oa.writeRecord(qp, null);
+                    
+                    qp.setType(Leader.UPTODATE);
+                    qp.setZxid(0);
+                    oa.writeRecord(qp, null);
+                    
+                    // Read the uptodate ack
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(proposalZxid, qp.getZxid());
+                    
+                    watcher.waitForChange();
+                    Assert.assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
+                    
+                    // check and make sure the change is persisted
+                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    lastZxid = zkDb2.loadDataBase();
+                    Assert.assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
+                    Assert.assertEquals(proposalZxid, lastZxid);
+                } finally {
+                    recursiveDelete(tmpDir);
+                }
+                
+            }
+
+            private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
+                qp.setType(Leader.PROPOSAL);
+                qp.setZxid(zxid);
+                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
+                SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                boa.writeRecord(hdr, null);
+                boa.writeRecord(sdt, null);
+                qp.setData(baos.toByteArray());
+            }
+        });
+    }
+    
     @Test
     public void testNormalRun() throws Exception {
-        testConversation(new LeaderConversation() {
+        testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException {
+                Assert.assertEquals(0, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000);
                 byte liBytes[] = new byte[12];
@@ -295,20 +493,30 @@ public class Zab1_0Test {
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
                         liBytes, null);
                 oa.writeRecord(qp, null);
+                
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.LEADERINFO, qp.getType());
                 Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                 Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
                         0x10000);
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(0, l.self.getCurrentEpoch());
+                
                 qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
                 oa.writeRecord(qp, null);
+                
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.DIFF, qp.getType());
+               
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.NEWLEADER, qp.getType());
                 Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(1, l.self.getCurrentEpoch());
+                
                 qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
                 oa.writeRecord(qp, null);
+
                 readPacketSkippingPing(ia, qp);
                 Assert.assertEquals(Leader.UPTODATE, qp.getType());
             }
@@ -317,7 +525,7 @@ public class Zab1_0Test {
     
     @Test
     public void testLeaderBehind() throws Exception {
-        testConversation(new LeaderConversation() {
+        testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException {
                 /* we test a normal run. everything should work out well. */
@@ -357,7 +565,7 @@ public class Zab1_0Test {
      */
     @Test
     public void testAbandonBeforeACKEpoch() throws Exception {
-        testConversation(new LeaderConversation() {
+        testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException, InterruptedException {
                 /* we test a normal run. everything should work out well. */            	
@@ -400,10 +608,36 @@ public class Zab1_0Test {
         addrField.setAccessible(true);
         addrField.set(peer, new InetSocketAddress(33556));
         ZKDatabase zkDb = new ZKDatabase(logFactory);
-        DataTreeBuilder treeBuilder = new MockDataTreeBuilder();
-        LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, treeBuilder, zkDb);
+        LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(), zkDb);
         return new Leader(peer, zk);
     }
+
+    static class ConversableFollower extends Follower {
+
+        ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
+            super(self, zk);
+        }
+
+        InetSocketAddress leaderAddr;
+        public void setLeaderSocketAddress(InetSocketAddress addr) {
+            leaderAddr = addr;
+        }
+        
+        @Override
+        protected InetSocketAddress findLeader() {
+            return leaderAddr;
+        }
+    }
+    private ConversableFollower createFollower(File tmpDir, QuorumPeer peer)
+    throws IOException {
+        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+        peer.setTxnFactory(logFactory);
+        ZKDatabase zkDb = new ZKDatabase(logFactory);
+        FollowerZooKeeperServer zk = new FollowerZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(), zkDb);
+        peer.setZKDatabase(zkDb);
+        return new ConversableFollower(peer, zk);
+    }
+
     private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
             FileNotFoundException {
         QuorumPeer peer = new QuorumPeer();

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Sat Nov  5 20:14:21 2011
@@ -109,7 +109,7 @@ public class FollowerResyncConcurrencyTe
 
             @Override
             public void run() {
-                for(int i = 0; i < 1000; i++) {
+                for(int i = 0; i < 3000; i++) {
                     zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
 
                         @Override
@@ -118,7 +118,7 @@ public class FollowerResyncConcurrencyTe
                             if (rc != 0) {
                                 errors++;
                             }
-                            if(counter == 14200){
+                            if(counter == 16200){
                                 sem.release();
                             }
                         }
@@ -144,7 +144,7 @@ public class FollowerResyncConcurrencyTe
                     if (rc != 0) {
                         errors++;
                     }
-                    if(counter == 14200){
+                    if(counter == 16200){
                         sem.release();
                     }
                 }
@@ -156,10 +156,10 @@ public class FollowerResyncConcurrencyTe
             }
             if(i == 12000){
                 //Restart off of snap, then get some txns for a log, then shut down
+                mytestfooThread.start();
                 qu.restart(index);       
                 Thread.sleep(300);
-                qu.shutdown(index);
-                mytestfooThread.start();
+                qu.shutdown(index);               
                 Thread.sleep(300);                
                 qu.restart(index);
                 LOG.info("Setting up server: " + index);
@@ -176,7 +176,7 @@ public class FollowerResyncConcurrencyTe
                         if (rc != 0) {
                             errors++;
                         }
-                        if(counter == 14200){
+                        if(counter == 16200){
                             sem.release();
                         }
                     }