You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2011/09/03 19:37:10 UTC
svn commit: r1164891 - in /zookeeper/branches/branch-3.3: ./
src/java/main/org/apache/zookeeper/server/persistence/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
Author: camille
Date: Sat Sep 3 17:37:10 2011
New Revision: 1164891
URL: http://svn.apache.org/viewvc?rev=1164891&view=rev
Log:
ZOOKEEPER-1154, ZOOKEEPER-1156:
Data inconsistency when the node(s) with the highest zxid is not present at the time of leader election
Log truncation truncating log too much - can cause data loss
Vishal Kathuria via camille
Modified:
zookeeper/branches/branch-3.3/CHANGES.txt
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Sat Sep 3 17:37:10 2011
@@ -13,6 +13,10 @@ BUGFIXES:
ZOOKEEPER-1117. zookeeper 3.3.3 fails to build with gcc >= 4.6.1 on
Debian/Ubuntu (James Page via mahadev)
+ ZOOKEEPER-1154. Data inconsistency when the node(s) with the highest zxid is not present at the time of leader election. (Vishal Kathuria via camille)
+
+ ZOOKEEPER-1156. Log truncation truncating log too much - can cause data loss. (Vishal Kathuria via camille)
+
Release 3.3.3 - 2011-02-23
Backward compatible changes:
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Sat Sep 3 17:37:10 2011
@@ -400,21 +400,32 @@ public class FileTxnLog implements TxnLo
long position;
protected PositionInputStream(InputStream in) {
super(in);
+ position = 0;
}
@Override
public int read() throws IOException {
int rc = super.read();
- if (rc > 0) {
+ if (rc > -1) {
position++;
}
return rc;
}
+
+ public int read(byte[] b) throws IOException {
+ int rc = super.read(b);
+ if (rc > 0) {
+ position += rc;
+ }
+ return rc;
+ }
@Override
public int read(byte[] b, int off, int len) throws IOException {
int rc = super.read(b, off, len);
- position += rc;
+ if (rc > 0) {
+ position += rc;
+ }
return rc;
}
@@ -429,6 +440,21 @@ public class FileTxnLog implements TxnLo
public long getPosition() {
return position;
}
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark(int readLimit) {
+ throw new UnsupportedOperationException("mark");
+ }
+
+ @Override
+ public void reset() {
+ throw new UnsupportedOperationException("reset");
+ }
}
/**
@@ -504,7 +530,7 @@ public class FileTxnLog implements TxnLo
}
/**
- * read the header fomr the inputarchive
+ * read the header from the inputarchive
* @param ia the inputarchive to be read from
* @param is the inputstream
* @throws IOException
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sat Sep 3 17:37:10 2011
@@ -270,14 +270,52 @@ public class LearnerHandler extends Thre
rl.lock();
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
+ LOG.info("Synchronizing with Follower sid: " + this.sid
+ +" maxCommittedLog ="+Long.toHexString(maxCommittedLog)
+ +" minCommittedLog = "+Long.toHexString(minCommittedLog)
+ +" peerLastZxid = "+Long.toHexString(peerLastZxid));
+
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
+
if (proposals.size() != 0) {
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
- packetToSend = Leader.DIFF;
- zxidToSend = maxCommittedLog;
+
+ // as we look through proposals, this variable keeps track of previous
+ // proposal Id.
+ long prevProposalZxid = minCommittedLog;
+
+ // Keep track of whether we are about to send the first packet.
+ // Before sending the first packet, we have to tell the learner
+ // whether to expect a trunc or a diff
+ boolean firstPacket=true;
+
for (Proposal propose: proposals) {
- if (propose.packet.getZxid() > peerLastZxid) {
+ // skip the proposals the peer already has
+ if (propose.packet.getZxid() <= peerLastZxid) {
+ prevProposalZxid = propose.packet.getZxid();
+ continue;
+ } else {
+ // If we are sending the first packet, figure out whether to trunc
+ // in case the follower has some proposals that the leader doesn't
+ if (firstPacket) {
+ firstPacket = false;
+ // Does the peer have some proposals that the leader hasn't seen yet
+ if (prevProposalZxid < peerLastZxid) {
+ // send a trunc message before sending the diff
+ packetToSend = Leader.TRUNC;
+ LOG.info("Sending TRUNC");
+ zxidToSend = prevProposalZxid;
+ updates = zxidToSend;
+ }
+ else {
+ // Just send the diff
+ packetToSend = Leader.DIFF;
+ LOG.info("Sending diff");
+ zxidToSend = maxCommittedLog;
+ }
+
+ }
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
@@ -292,7 +330,7 @@ public class LearnerHandler extends Thre
} else {
// just let the state transfer happen
}
-
+
leaderLastZxid = leader.startForwarding(this, updates);
if (peerLastZxid == leaderLastZxid) {
// We are in sync so we'll do an empty diff
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Sat Sep 3 17:37:10 2011
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quor
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.LineNumberReader;
import java.io.StringReader;
import java.net.InetSocketAddress;
@@ -36,7 +37,10 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
import org.junit.Test;
@@ -286,4 +290,165 @@ public class QuorumPeerMainTest extends
assertTrue("fastleaderelection used", found);
}
+ /**
+ * Test the case of server with highest zxid not present at leader election and joining later.
+ * This test case is for reproducing the issue and fixing the bug mentioned in ZOOKEEPER-1154
+ * and ZOOKEEPER-1156.
+ */
+ @Test
+ public void testHighestZxidJoinLate() throws Exception {
+ int numServers = 3;
+ Servers svrs = LaunchServers(numServers);
+ String path = "/hzxidtest";
+ int leader=-1;
+
+ // find the leader
+ for (int i=0; i < numServers; i++) {
+ if (svrs.mt[i].main.quorumPeer.leader != null) {
+ leader = i;
+ }
+ }
+
+ // make sure there is a leader
+ Assert.assertTrue("There should be a leader", leader >=0);
+
+ int nonleader = (leader+1)%numServers;
+
+ byte[] input = new byte[1];
+ input[0] = 1;
+ byte[] output;
+
+ // Create a couple of nodes
+ svrs.zk[leader].create(path+leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ svrs.zk[leader].create(path+nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // make sure the updates indeed committed. If it is not
+ // the following statement will throw.
+ output = svrs.zk[leader].getData(path+nonleader, false, null);
+
+ // Shutdown every one else but the leader
+ for (int i=0; i < numServers; i++) {
+ if (i != leader) {
+ svrs.mt[i].shutdown();
+ }
+ }
+
+ input[0] = 2;
+
+ // Update the node on the leader
+ svrs.zk[leader].setData(path+leader, input, -1, null, null);
+
+ // wait some time to let this get written to disk
+ Thread.sleep(500);
+
+ // shut the leader down
+ svrs.mt[leader].shutdown();
+
+ System.gc();
+
+ waitForAll(svrs.zk, States.CONNECTING);
+
+ // Start everyone but the leader
+ for (int i=0; i < numServers; i++) {
+ if (i != leader) {
+ svrs.mt[i].start();
+ }
+ }
+
+ // wait to connect to one of these
+ waitForOne(svrs.zk[nonleader], States.CONNECTED);
+
+ // validate that the old value is there and not the new one
+ output = svrs.zk[nonleader].getData(path+leader, false, null);
+
+ Assert.assertEquals(
+ "Expecting old value 1 since 2 isn't committed yet",
+ output[0], 1);
+
+ // Do some other update, so we bump the maxCommttedZxid
+ // by setting the value to 2
+ svrs.zk[nonleader].setData(path+nonleader, input, -1);
+
+ // start the old leader
+ svrs.mt[leader].start();
+
+ // connect to it
+ waitForOne(svrs.zk[leader], States.CONNECTED);
+
+ // make sure it doesn't have the new value that it alone had logged
+ output = svrs.zk[leader].getData(path+leader, false, null);
+ Assert.assertEquals(
+ "Validating that the deposed leader has rolled back that change it had written",
+ output[0], 1);
+
+ // make sure the leader has the subsequent changes that were made while it was offline
+ output = svrs.zk[leader].getData(path+nonleader, false, null);
+ Assert.assertEquals(
+ "Validating that the deposed leader caught up on changes it missed",
+ output[0], 2);
+ }
+
+ // This class holds the servers and clients for those servers
+ private class Servers {
+ MainThread mt[];
+ ZooKeeper zk[];
+ }
+
+ /**
+ * This is a helper function for launching a set of servers
+ *
+ * @param numServers
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
+ int SERVER_COUNT = numServers;
+ Servers svrs = new Servers();
+ final int clientPorts[] = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
+ }
+ String quorumCfgSection = sb.toString();
+
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ waitForAll(zk, States.CONNECTED);
+
+ svrs.mt = mt;
+ svrs.zk = zk;
+ return svrs;
+ }
+
+ private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
+ while(zk.getState() != state) {
+ Thread.sleep(500);
+ }
+ }
+
+ private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
+ int iterations = 10;
+ boolean someoneNotConnected = true;
+ while(someoneNotConnected) {
+ if (iterations-- == 0) {
+ throw new RuntimeException("Waiting too long");
+ }
+
+ someoneNotConnected = false;
+ for(ZooKeeper zk: zks) {
+ if (zk.getState() != state) {
+ someoneNotConnected = true;
+ }
+ }
+ Thread.sleep(1000);
+ }
+ }
}
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1164891&r1=1164890&r2=1164891&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Sat Sep 3 17:37:10 2011
@@ -40,7 +40,7 @@ import org.apache.zookeeper.test.QuorumB
public class QuorumPeerTestBase extends TestCase implements Watcher {
protected static final Logger LOG =
Logger.getLogger(QuorumPeerTestBase.class);
-
+
public void process(WatchedEvent event) {
// ignore for this test
}
@@ -56,7 +56,7 @@ public class QuorumPeerTestBase extends
public static class MainThread extends Thread {
final File confFile;
- final TestQPMain main;
+ volatile TestQPMain main;
public MainThread(int myid, int clientPort, String quorumCfgSection)
throws IOException
@@ -83,7 +83,7 @@ public class QuorumPeerTestBase extends
dir = dir.replace('\\', '/');
}
fwriter.write("dataDir=" + dir + "\n");
-
+
fwriter.write("clientPort=" + clientPort + "\n");
fwriter.write(quorumCfgSection + "\n");
fwriter.flush();
@@ -98,6 +98,12 @@ public class QuorumPeerTestBase extends
main = new TestQPMain();
}
+ Thread currentThread;
+ synchronized public void start() {
+ main = new TestQPMain();
+ currentThread = new Thread(this);
+ currentThread.start();
+ }
public void run() {
String args[] = new String[1];
args[0] = confFile.toString();
@@ -106,11 +112,18 @@ public class QuorumPeerTestBase extends
} catch (Exception e) {
// test will still fail even though we just log/ignore
LOG.error("unexpected exception in run", e);
+ } finally {
+ currentThread = null;
}
}
- public void shutdown() {
- main.shutdown();
+ public void shutdown() throws InterruptedException {
+ Thread t = currentThread;
+ if (t != null && t.isAlive()) {
+ main.shutdown();
+ t.join(500);
+ }
}
+
}
}