You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2012/07/05 18:07:48 UTC
svn commit: r1357717 - in /zookeeper/branches/branch-3.4: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
Author: camille
Date: Thu Jul 5 16:07:47 2012
New Revision: 1357717
URL: http://svn.apache.org/viewvc?rev=1357717&view=rev
Log:
ZOOKEEPER-1465. Cluster availability following new leader election
takes a long time with large datasets - is correlated to dataset size
(fpj and Thawan Kooburat via camille)
Modified:
zookeeper/branches/branch-3.4/CHANGES.txt
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1357717&r1=1357716&r2=1357717&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Thu Jul 5 16:07:47 2012
@@ -73,6 +73,10 @@ BUGFIXES:
ZOOKEEPER-1471. Jute generates invalid C++ code
(Michi Mutsuzaki via phunt)
+
+ ZOOKEEPER-1465. Cluster availability following new leader election
+ takes a long time with large datasets - is correlated to dataset size
+ (fpj and Thawan Kooburat via camille)
IMPROVEMENTS:
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1357717&r1=1357716&r2=1357717&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jul 5 16:07:47 2012
@@ -840,7 +840,11 @@ public class Leader {
}
if (ss.getCurrentEpoch() != -1) {
if (ss.isMoreRecentThan(leaderStateSummary)) {
- throw new IOException("Follower is ahead of the leader");
+ throw new IOException("Follower is ahead of the leader, leader summary: "
+ + leaderStateSummary.getCurrentEpoch()
+ + " (current epoch), "
+ + leaderStateSummary.getLastZxid()
+ + " (last zxid)");
}
electingFollowers.add(id);
}
@@ -862,4 +866,50 @@ public class Leader {
}
}
}
+
+ /**
+ * Get string representation of a given packet type
+ * @param packetType
+ * @return string representing the packet type
+ */
+ public static String getPacketType(int packetType) {
+ switch (packetType) {
+ case DIFF:
+ return "DIFF";
+ case TRUNC:
+ return "TRUNC";
+ case SNAP:
+ return "SNAP";
+ case OBSERVERINFO:
+ return "OBSERVERINFO";
+ case NEWLEADER:
+ return "NEWLEADER";
+ case FOLLOWERINFO:
+ return "FOLLOWERINFO";
+ case UPTODATE:
+ return "UPTODATE";
+ case LEADERINFO:
+ return "LEADERINFO";
+ case ACKEPOCH:
+ return "ACKEPOCH";
+ case REQUEST:
+ return "REQUEST";
+ case PROPOSAL:
+ return "PROPOSAL";
+ case ACK:
+ return "ACK";
+ case COMMIT:
+ return "COMMIT";
+ case PING:
+ return "PING";
+ case REVALIDATE:
+ return "REVALIDATE";
+ case SYNC:
+ return "SYNC";
+ case INFORM:
+ return "INFORM";
+ default:
+ return "UNKNOWN";
+ }
+ }
}
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1357717&r1=1357716&r2=1357717&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Jul 5 16:07:47 2012
@@ -338,6 +338,12 @@ public class LearnerHandler extends Thre
// whether to expect a trunc or a diff
boolean firstPacket=true;
+ // If we are here, we can use committedLog to sync with
+ // follower. Then we only need to decide whether to
+ // send trunc or not
+ packetToSend = Leader.DIFF;
+ zxidToSend = maxCommittedLog;
+
for (Proposal propose: proposals) {
// skip the proposals the peer already has
if (propose.packet.getZxid() <= peerLastZxid) {
@@ -351,18 +357,10 @@ public class LearnerHandler extends Thre
// Does the peer have some proposals that the leader hasn't seen yet
if (prevProposalZxid < peerLastZxid) {
// send a trunc message before sending the diff
- packetToSend = Leader.TRUNC;
- LOG.info("Sending TRUNC");
+ packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
- }
- else {
- // Just send the diff
- packetToSend = Leader.DIFF;
- LOG.info("Sending diff");
- zxidToSend = maxCommittedLog;
}
-
}
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
@@ -381,19 +379,23 @@ public class LearnerHandler extends Thre
} else {
LOG.warn("Unhandled proposal scenario");
}
+ } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
+ // The leader may recently take a snapshot, so the committedLog
+ // is empty. We don't need to send snapshot if the follow
+ // is already sync with in-memory db.
+ LOG.debug("committedLog is empty but leader and follower "
+ + "are in sync, zxid=0x{}",
+ Long.toHexString(peerLastZxid));
+ packetToSend = Leader.DIFF;
+ zxidToSend = peerLastZxid;
} else {
// just let the state transfer happen
LOG.debug("proposals is empty");
}
+ LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates);
- if (peerLastZxid == leaderLastZxid) {
- LOG.debug("Leader and follower are in sync, sending empty diff. zxid=0x{}",
- Long.toHexString(leaderLastZxid));
- // We are in sync so we'll do an empty diff
- packetToSend = Leader.DIFF;
- zxidToSend = leaderLastZxid;
- }
+
} finally {
rl.unlock();
}
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1357717&r1=1357716&r2=1357717&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Thu Jul 5 16:07:47 2012
@@ -49,6 +49,7 @@ import org.apache.zookeeper.server.ZKDat
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.util.ZxidUtils;
@@ -306,6 +307,10 @@ public class Zab1_0Test {
void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws Exception;
}
+ static public interface PopulatedLeaderConversation {
+ void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception;
+ }
+
static public interface FollowerConversation {
void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
}
@@ -352,6 +357,76 @@ public class Zab1_0Test {
}
}
+ public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation, int ops) throws Exception {
+ Socket pair[] = getSocketPair();
+ Socket leaderSocket = pair[0];
+ Socket followerSocket = pair[1];
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ LeadThread leadThread = null;
+ Leader leader = null;
+ try {
+ // Setup a database with two znodes
+ FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir);
+ ZKDatabase zkDb = new ZKDatabase(snapLog);
+
+ Assert.assertTrue(ops >= 1);
+ long zxid = ZxidUtils.makeZxid(1, 0);
+ for(int i = 1; i <= ops; i++){
+ zxid = ZxidUtils.makeZxid(1, i);
+ String path = "/foo-"+ i;
+ zkDb.processTxn(new TxnHeader(13,1000+i,zxid,30+i,ZooDefs.OpCode.create),
+ new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+ Stat stat = new Stat();
+ Assert.assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null)));
+ }
+ Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
+
+ // Generate snapshot and close files.
+ snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+ snapLog.close();
+
+ QuorumPeer peer = createQuorumPeer(tmpDir);
+
+ leader = createLeader(tmpDir, peer);
+ peer.leader = leader;
+
+ // Set the last accepted epoch and current epochs to be 1
+ peer.setAcceptedEpoch(1);
+ peer.setCurrentEpoch(1);
+
+
+ leadThread = new LeadThread(leader);
+ leadThread.start();
+
+ while(leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
+ Thread.sleep(20);
+ }
+
+ LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+ lh.start();
+ leaderSocket.setSoTimeout(4000);
+
+ InputArchive ia = BinaryInputArchive.getArchive(followerSocket
+ .getInputStream());
+ OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket
+ .getOutputStream());
+
+ conversation.converseWithLeader(ia, oa, leader, zxid);
+ } finally {
+ recursiveDelete(tmpDir);
+ if (leader != null) {
+ leader.shutdown("end of test");
+ }
+ if (leadThread != null) {
+ leadThread.interrupt();
+ leadThread.join();
+ }
+ }
+ }
+
+
public void testFollowerConversation(FollowerConversation conversation) throws Exception {
File tmpDir = File.createTempFile("test", "dir");
tmpDir.delete();
@@ -403,6 +478,46 @@ public class Zab1_0Test {
}
@Test
+ public void testUnnecessarySnap() throws Exception {
+ testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
+ @Override
+ public void converseWithLeader(InputArchive ia, OutputArchive oa,
+ Leader l, long zxid) throws Exception {
+
+ Assert.assertEquals(1, l.self.getAcceptedEpoch());
+ Assert.assertEquals(1, l.self.getCurrentEpoch());
+
+ /* we test a normal run. everything should work out well. */
+ LearnerInfo li = new LearnerInfo(1, 0x10000);
+ byte liBytes[] = new byte[12];
+ ByteBufferOutputStream.record2ByteBuffer(li,
+ ByteBuffer.wrap(liBytes));
+ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1,
+ liBytes, null);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
+ Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+ 0x10000);
+ Assert.assertEquals(2, l.self.getAcceptedEpoch());
+ Assert.assertEquals(1, l.self.getCurrentEpoch());
+
+ byte epochBytes[] = new byte[4];
+ final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
+ wrappedEpochBytes.putInt(1);
+ qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.DIFF, qp.getType());
+
+ }
+ }, 2);
+ }
+
+ @Test
public void testNormalFollowerRun() throws Exception {
testFollowerConversation(new FollowerConversation() {
@Override
@@ -685,9 +800,8 @@ public class Zab1_0Test {
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
- Assert.assertEquals(Leader.SNAP, qp.getType());
- deserializeSnapshot(ia);
-
+ Assert.assertEquals(Leader.DIFF, qp.getType());
+
readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.NEWLEADER, qp.getType());
Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
@@ -742,9 +856,7 @@ public class Zab1_0Test {
qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
- Assert.assertEquals(Leader.SNAP, qp.getType());
- deserializeSnapshot(ia);
-
+ Assert.assertEquals(Leader.DIFF, qp.getType());
readPacketSkippingPing(ia, qp);
Assert.assertEquals(Leader.NEWLEADER, qp.getType());
Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());