You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2011/11/05 21:14:21 UTC
svn commit: r1198043 - in /zookeeper/branches/branch-3.4: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: camille
Date: Sat Nov 5 20:14:21 2011
New Revision: 1198043
URL: http://svn.apache.org/viewvc?rev=1198043&view=rev
Log:
ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing intermittently.
ZOOKEEPER-1282. Learner.java not following Zab 1.0 protocol - setCurrentEpoch should be done upon receipt of NEWLEADER (before acking it) and not upon receipt of UPTODATE.
ZOOKEEPER-1291. AcceptedEpoch not updated at leader before it proposes the epoch to followers.
Modified:
zookeeper/branches/branch-3.4/CHANGES.txt
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Sat Nov 5 20:14:21 2011
@@ -363,6 +363,15 @@ BUGFIXES:
ZOOKEEPER-1270. testEarlyLeaderAbandonment failing intermittently,
quorum formed, no serving. (Flavio, Camille and Alexander Shraer via mahadev)
+
+ ZOOKEEPER-1264. FollowerResyncConcurrencyTest failing
+ intermittently. (breed, camille and Alex Shraer via camille)
+
+ ZOOKEEPER-1282. Learner.java not following Zab 1.0 protocol -
+ setCurrentEpoch should be done upon receipt of NEWLEADER
+ (before acking it) and not upon receipt of UPTODATE (breed via camille)
+
+ ZOOKEEPER-1291. AcceptedEpoch not updated at leader before it proposes the epoch to followers. (Alex Shraer via camille)
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Sat Nov 5 20:14:21 2011
@@ -311,7 +311,6 @@ public class Leader {
readyToStart = true;
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
- self.setAcceptedEpoch(epoch);
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
@@ -770,7 +769,7 @@ public class Leader {
}
private HashSet<Long> connectingFollowers = new HashSet<Long>();
- public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException {
+ public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
if (!waitingForNewEpoch) {
return epoch;
@@ -782,8 +781,9 @@ public class Leader {
QuorumVerifier verifier = self.getQuorumVerifier();
if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers))
{
- waitingForNewEpoch = false;
- connectingFollowers.notifyAll();
+ waitingForNewEpoch = false;
+ self.setAcceptedEpoch(epoch);
+ connectingFollowers.notifyAll();
} else {
long start = System.currentTimeMillis();
long cur = start;
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Sat Nov 5 20:14:21 2011
@@ -312,8 +312,10 @@ public class Learner {
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
+ long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
readPacket(qp);
+ LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
@@ -376,12 +378,16 @@ public class Learner {
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
- pif = packetsNotCommitted.peekFirst();
- if (pif.hdr.getZxid() != qp.getZxid()) {
- LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
+ if (!snapshotTaken) {
+ pif = packetsNotCommitted.peekFirst();
+ if (pif.hdr.getZxid() != qp.getZxid()) {
+ LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
+ } else {
+ zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+ packetsNotCommitted.remove();
+ }
} else {
- zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
- packetsNotCommitted.remove();
+ packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
@@ -390,28 +396,34 @@ public class Learner {
zk.getZKDatabase().processTxn(hdr, txn);
break;
case Leader.UPTODATE:
- if (!snapshotTaken) {
+ if (!snapshotTaken) { // true for the pre v1.0 case
zk.takeSnapshot();
+ self.setCurrentEpoch(newEpoch);
}
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
zk.takeSnapshot();
+ self.setCurrentEpoch(newEpoch);
snapshotTaken = true;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
- long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
- self.setCurrentEpoch(newEpoch);
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
zk.startup();
- //We have to have a commit processor to do this
- for(PacketInFlight p: packetsNotCommitted) {
- ((FollowerZooKeeperServer)zk).logRequest(p.hdr, p.rec);
+ // We need to log the stuff that came in between the snapshot and the uptodate
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
+ for(PacketInFlight p: packetsNotCommitted) {
+ fzk.logRequest(p.hdr, p.rec);
+ }
+ for(Long zxid: packetsCommitted) {
+ fzk.commit(zxid);
+ }
}
}
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Sat Nov 5 20:14:21 2011
@@ -18,6 +18,7 @@
package org.apache.zookeeper.server.quorum;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@@ -26,7 +27,6 @@ import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -34,22 +34,24 @@ import org.apache.jute.BinaryInputArchiv
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
-import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ByteBufferOutputStream;
-import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.LearnerInfo;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.Zab1_0Test.LeaderConversation;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
import org.junit.Assert;
import org.junit.Test;
@@ -209,13 +211,6 @@ public class Zab1_0Test {
public void closeAll() {
}
}
- static class MockDataTreeBuilder implements DataTreeBuilder {
- @Override
- public DataTree build() {
- return new DataTree();
- }
-
- }
static Socket[] getSocketPair() throws IOException {
ServerSocket ss = new ServerSocket();
ss.bind(null);
@@ -237,10 +232,10 @@ public class Zab1_0Test {
}
static public interface FollowerConversation {
- void converseWithFollower(InputArchive ia, OutputArchive oa) throws Exception;
+ void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
}
- public void testConversation(LeaderConversation conversation) throws Exception {
+ public void testLeaderConversation(LeaderConversation conversation) throws Exception {
Socket pair[] = getSocketPair();
Socket leaderSocket = pair[0];
Socket followerSocket = pair[1];
@@ -281,12 +276,215 @@ public class Zab1_0Test {
}
}
}
-
+
+ public void testFollowerConversation(FollowerConversation conversation) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ Thread followerThread = null;
+ ConversableFollower follower = null;
+ QuorumPeer peer = null;
+ try {
+ peer = createQuorumPeer(tmpDir);
+ follower = createFollower(tmpDir, peer);
+ peer.follower = follower;
+
+ ServerSocket ss = new ServerSocket();
+ ss.bind(null);
+ follower.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
+ final Follower followerForThread = follower;
+
+ followerThread = new Thread() {
+ public void run() {
+ try {
+ followerForThread.followLeader();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ followerThread.start();
+ Socket leaderSocket = ss.accept();
+
+ InputArchive ia = BinaryInputArchive.getArchive(leaderSocket
+ .getInputStream());
+ OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket
+ .getOutputStream());
+
+ conversation.converseWithFollower(ia, oa, follower);
+ } finally {
+ if (follower != null) {
+ follower.shutdown();
+ }
+ if (followerThread != null) {
+ followerThread.interrupt();
+ followerThread.join();
+ }
+ if (peer != null) {
+ peer.shutdown();
+ }
+ recursiveDelete(tmpDir);
+ }
+ }
+
+ @Test
+ public void testNormalFollowerRun() throws Exception {
+ testFollowerConversation(new FollowerConversation() {
+ @Override
+ public void converseWithFollower(InputArchive ia, OutputArchive oa,
+ Follower f) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
+ File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+ try {
+ Assert.assertEquals(0, f.self.getAcceptedEpoch());
+ Assert.assertEquals(0, f.self.getCurrentEpoch());
+
+ // Setup a database with a single /foo node
+ ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+ final long firstZxid = ZxidUtils.makeZxid(1, 1);
+ zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+ Stat stat = new Stat();
+ Assert.assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
+
+ QuorumPacket qp = new QuorumPacket();
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.FOLLOWERINFO, qp.getType());
+ Assert.assertEquals(qp.getZxid(), 0);
+ LearnerInfo learnInfo = new LearnerInfo();
+ ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
+ Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+ Assert.assertEquals(learnInfo.getServerid(), 0);
+
+ // We are simulating an established leader, so the epoch is 1
+ qp.setType(Leader.LEADERINFO);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ byte protoBytes[] = new byte[4];
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
+ qp.setData(protoBytes);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
+ Assert.assertEquals(0, qp.getZxid());
+ Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
+ Assert.assertEquals(1, f.self.getAcceptedEpoch());
+ Assert.assertEquals(0, f.self.getCurrentEpoch());
+
+ // Send the snapshot we created earlier
+ qp.setType(Leader.SNAP);
+ qp.setData(new byte[0]);
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+ oa.writeRecord(qp, null);
+ zkDb.serializeSnapshot(oa);
+ oa.writeString("BenWasHere", null);
+ qp.setType(Leader.NEWLEADER);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ oa.writeRecord(qp, null);
+
+ // Get the ack of the new leader
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACK, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ Assert.assertEquals(1, f.self.getAcceptedEpoch());
+ Assert.assertEquals(1, f.self.getCurrentEpoch());
+
+ Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
+
+ // Make sure the data was recorded in the filesystem ok
+ ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+ long lastZxid = zkDb2.loadDataBase();
+ Assert.assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
+ Assert.assertEquals(firstZxid, lastZxid);
+
+ // Propose an update
+ long proposalZxid = ZxidUtils.makeZxid(1, 1000);
+ proposeSetData(qp, proposalZxid, "data2", 2);
+ oa.writeRecord(qp, null);
+
+ // We want to track the change with a callback rather than depending on timing
+ class TrackerWatcher implements Watcher {
+ boolean changed;
+ synchronized void waitForChange() throws InterruptedException {
+ while(!changed) {
+ wait();
+ }
+ }
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == EventType.NodeDataChanged) {
+ synchronized(this) {
+ changed = true;
+ notifyAll();
+ }
+ }
+ }
+ synchronized public boolean changed() {
+ return changed;
+ }
+
+ };
+ TrackerWatcher watcher = new TrackerWatcher();
+
+ // The change should not have happened yet, since we haven't committed
+ Assert.assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
+
+ // The change should happen now
+ qp.setType(Leader.COMMIT);
+ qp.setZxid(proposalZxid);
+ oa.writeRecord(qp, null);
+
+ qp.setType(Leader.UPTODATE);
+ qp.setZxid(0);
+ oa.writeRecord(qp, null);
+
+ // Read the uptodate ack
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACK, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.ACK, qp.getType());
+ Assert.assertEquals(proposalZxid, qp.getZxid());
+
+ watcher.waitForChange();
+ Assert.assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
+
+ // check and make sure the change is persisted
+ zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+ lastZxid = zkDb2.loadDataBase();
+ Assert.assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
+ Assert.assertEquals(proposalZxid, lastZxid);
+ } finally {
+ recursiveDelete(tmpDir);
+ }
+
+ }
+
+ private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
+ qp.setType(Leader.PROPOSAL);
+ qp.setZxid(zxid);
+ TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
+ SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ boa.writeRecord(hdr, null);
+ boa.writeRecord(sdt, null);
+ qp.setData(baos.toByteArray());
+ }
+ });
+ }
+
@Test
public void testNormalRun() throws Exception {
- testConversation(new LeaderConversation() {
+ testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
throws IOException {
+ Assert.assertEquals(0, l.self.getAcceptedEpoch());
+ Assert.assertEquals(0, l.self.getCurrentEpoch());
+
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000);
byte liBytes[] = new byte[12];
@@ -295,20 +493,30 @@ public class Zab1_0Test {
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
liBytes, null);
oa.writeRecord(qp, null);
+
readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.LEADERINFO, qp.getType());
Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
0x10000);
+ Assert.assertEquals(1, l.self.getAcceptedEpoch());
+ Assert.assertEquals(0, l.self.getCurrentEpoch());
+
qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
oa.writeRecord(qp, null);
+
readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.DIFF, qp.getType());
+
readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.NEWLEADER, qp.getType());
Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ Assert.assertEquals(1, l.self.getAcceptedEpoch());
+ Assert.assertEquals(1, l.self.getCurrentEpoch());
+
qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
oa.writeRecord(qp, null);
+
readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.UPTODATE, qp.getType());
}
@@ -317,7 +525,7 @@ public class Zab1_0Test {
@Test
public void testLeaderBehind() throws Exception {
- testConversation(new LeaderConversation() {
+ testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
throws IOException {
/* we test a normal run. everything should work out well. */
@@ -357,7 +565,7 @@ public class Zab1_0Test {
*/
@Test
public void testAbandonBeforeACKEpoch() throws Exception {
- testConversation(new LeaderConversation() {
+ testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
throws IOException, InterruptedException {
/* we test a normal run. everything should work out well. */
@@ -400,10 +608,36 @@ public class Zab1_0Test {
addrField.setAccessible(true);
addrField.set(peer, new InetSocketAddress(33556));
ZKDatabase zkDb = new ZKDatabase(logFactory);
- DataTreeBuilder treeBuilder = new MockDataTreeBuilder();
- LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, treeBuilder, zkDb);
+ LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(), zkDb);
return new Leader(peer, zk);
}
+
+ static class ConversableFollower extends Follower {
+
+ ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
+ super(self, zk);
+ }
+
+ InetSocketAddress leaderAddr;
+ public void setLeaderSocketAddress(InetSocketAddress addr) {
+ leaderAddr = addr;
+ }
+
+ @Override
+ protected InetSocketAddress findLeader() {
+ return leaderAddr;
+ }
+ }
+ private ConversableFollower createFollower(File tmpDir, QuorumPeer peer)
+ throws IOException {
+ FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+ peer.setTxnFactory(logFactory);
+ ZKDatabase zkDb = new ZKDatabase(logFactory);
+ FollowerZooKeeperServer zk = new FollowerZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(), zkDb);
+ peer.setZKDatabase(zkDb);
+ return new ConversableFollower(peer, zk);
+ }
+
private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
FileNotFoundException {
QuorumPeer peer = new QuorumPeer();
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1198043&r1=1198042&r2=1198043&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Sat Nov 5 20:14:21 2011
@@ -109,7 +109,7 @@ public class FollowerResyncConcurrencyTe
@Override
public void run() {
- for(int i = 0; i < 1000; i++) {
+ for(int i = 0; i < 3000; i++) {
zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
@@ -118,7 +118,7 @@ public class FollowerResyncConcurrencyTe
if (rc != 0) {
errors++;
}
- if(counter == 14200){
+ if(counter == 16200){
sem.release();
}
}
@@ -144,7 +144,7 @@ public class FollowerResyncConcurrencyTe
if (rc != 0) {
errors++;
}
- if(counter == 14200){
+ if(counter == 16200){
sem.release();
}
}
@@ -156,10 +156,10 @@ public class FollowerResyncConcurrencyTe
}
if(i == 12000){
//Restart off of snap, then get some txns for a log, then shut down
+ mytestfooThread.start();
qu.restart(index);
Thread.sleep(300);
- qu.shutdown(index);
- mytestfooThread.start();
+ qu.shutdown(index);
Thread.sleep(300);
qu.restart(index);
LOG.info("Setting up server: " + index);
@@ -176,7 +176,7 @@ public class FollowerResyncConcurrencyTe
if (rc != 0) {
errors++;
}
- if(counter == 14200){
+ if(counter == 16200){
sem.release();
}
}