You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/08/25 07:45:25 UTC
svn commit: r807484 - in /hadoop/zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/persistence/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: mahadev
Date: Tue Aug 25 05:45:24 2009
New Revision: 807484
URL: http://svn.apache.org/viewvc?rev=807484&view=rev
Log:
ZOOKEEPER-508. proposals and commits for DIFF and Truncate messages from the leader to the followers is buggy. (mahadev and ben via mahadev)
Added:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Aug 25 05:45:24 2009
@@ -65,6 +65,9 @@
ZOOKEEPER-498. Unending Leader Elections : WAN configuration (flavio via
mahadev)
+ ZOOKEEPER-508. proposals and commits for DIFF and Truncate messages from the
+ leader to the followers is buggy. (mahadev and ben via mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
"socket reuse" and failure to close client (phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Tue Aug 25 05:45:24 2009
@@ -24,6 +24,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
@@ -271,8 +272,8 @@
*/
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid);
- FileInputStream input = itr.inputStream;
- long pos = input.getChannel().position();
+ PositionInputStream input = itr.inputStream;
+ long pos = input.getPosition();
// now, truncate at the current position
RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
raf.setLength(pos);
@@ -322,6 +323,48 @@
}
/**
+ * a class that keeps track of the position
+ * in the input stream. The position points to offset
+ * that has been consumed by the applications. It can
+ * wrap buffered input streams to provide the right offset
+ * for the application.
+ */
+ static class PositionInputStream extends FilterInputStream {
+ long position;
+ protected PositionInputStream(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public int read() throws IOException {
+ int rc = super.read();
+ if (rc > 0) {
+ position++;
+ }
+ return rc;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int rc = super.read(b, off, len);
+ position += rc;
+ return rc;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long rc = super.skip(n);
+ if (rc > 0) {
+ position += rc;
+ }
+ return rc;
+ }
+ public long getPosition() {
+ return position;
+ }
+ }
+
+ /**
* this class implements the txnlog iterator interface
* which is used for reading the transaction logs
*/
@@ -333,7 +376,8 @@
File logFile;
InputArchive ia;
static final String CRC_ERROR="CRC check failed";
- FileInputStream inputStream=null;
+
+ PositionInputStream inputStream=null;
//stored files is the list of files greater than
//the zxid we are looking for.
private ArrayList<File> storedFiles;
@@ -398,7 +442,7 @@
* @param is the inputstream
* @throws IOException
*/
- protected void inStreamCreated(InputArchive ia, FileInputStream is)
+ protected void inStreamCreated(InputArchive ia, InputStream is)
throws IOException{
FileHeader header= new FileHeader();
header.deserialize(ia, "fileheader");
@@ -416,9 +460,9 @@
**/
protected InputArchive createInputArchive(File logFile) throws IOException {
if(inputStream==null){
- inputStream= new FileInputStream(logFile);
+ inputStream= new PositionInputStream(new BufferedInputStream(new FileInputStream(logFile)));
LOG.debug("Created new input stream " + logFile);
- ia = BinaryInputArchive.getArchive(new BufferedInputStream(inputStream));
+ ia = BinaryInputArchive.getArchive(inputStream);
inStreamCreated(ia,inputStream);
LOG.debug("created new input archive " + logFile);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Aug 25 05:45:24 2009
@@ -200,7 +200,7 @@
readPacket(qp);
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
- LOG.info("Getting a diff from the leader!");
+ LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
zk.loadData();
}
else if (qp.getType() == Leader.SNAP) {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java Tue Aug 25 05:45:24 2009
@@ -62,7 +62,7 @@
long getSid(){
return sid;
- }
+ }
/**
* The packets to be sent to the follower
@@ -224,15 +224,23 @@
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
- LOG.info("The follower sid: " + this.sid);
+ LOG.info("Follower sid: " + this.sid + " : info : "
+ + leader.self.quorumPeers.get(this.sid));
+ /* this is the last zxid from the follower but the leader might have to
+ restart the follower from a different zxid depending on truncate and diff. */
long peerLastZxid = qp.getZxid();
-
+ /* the default to send to the follower */
int packetToSend = Leader.SNAP;
boolean logTxns = true;
-
long zxidToSend = 0;
- // we are sending the diff
+
+ /** the packets that the follower needs to get updates from **/
+ long updates = peerLastZxid;
+
+ /* we are sending the diff check if we have proposals in memory to be able to
+ * send a diff to the
+ */
synchronized(leader.zk.committedLog) {
if (leader.zk.committedLog.size() != 0) {
if ((leader.zk.maxCommittedLog >= peerLastZxid)
@@ -252,16 +260,7 @@
}
else {
logTxns = false;
- } }
- long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- leaderLastZxid, null, null);
- oa.writeRecord(newLeaderQP, "packet");
- bufferedOutput.flush();
- // a special case when both the ids are the same
- if (peerLastZxid == leaderLastZxid) {
- packetToSend = Leader.DIFF;
- zxidToSend = leaderLastZxid;
+ }
}
//check if we decided to send a diff or we need to send a truncate
// we avoid using epochs for truncating because epochs make things
@@ -274,11 +273,31 @@
// we can ask the follower to truncate the log
packetToSend = Leader.TRUNC;
zxidToSend = leader.zk.maxCommittedLog;
-
+ updates = zxidToSend;
}
+
+ /* see what other packets from the proposal
+ * and tobeapplied queues need to be sent
+ * and then decide if we can just send a DIFF
+ * or we actually need to send the whole snapshot
+ */
+ long leaderLastZxid = leader.startForwarding(this, updates);
+ // a special case when both the ids are the same
+ if (peerLastZxid == leaderLastZxid) {
+ packetToSend = Leader.DIFF;
+ zxidToSend = leaderLastZxid;
+ }
+
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ leaderLastZxid, null, null);
+ oa.writeRecord(newLeaderQP, "packet");
+ bufferedOutput.flush();
+
+
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
- // only if we are not truncating or fast sycning
+
+ /* if we are not truncating or sending a diff just send a snapshot */
if (packetToSend == Leader.SNAP) {
LOG.warn("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
@@ -289,7 +308,7 @@
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
- //
+
// Mutation packets will be queued during the serialize,
// so we need to mark when the follower can actually start
// using the data
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Tue Aug 25 05:45:24 2009
@@ -647,8 +647,7 @@
}
handler.queuePacket(p.packet);
// Since the proposal has been committed we need to send the
- // commit message
- // also
+ // commit message also
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet
.getZxid(), null, null);
handler.queuePacket(qp);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java Tue Aug 25 05:45:24 2009
@@ -43,13 +43,33 @@
try {
follower.writePacket(qp, false);
} catch (IOException e) {
- LOG.warn("Ignoring unexpected exception during packet send", e);
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ if (!follower.sock.isClosed()) {
+ follower.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
}
}
}
public void flush() throws IOException {
- follower.writePacket(null, true);
+ try {
+ follower.writePacket(null, true);
+ } catch(IOException e) {
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ if (!follower.sock.isClosed()) {
+ follower.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
+ }
}
public void shutdown() {
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Aug 25 05:45:24 2009
@@ -261,7 +261,7 @@
File tmpFile = File.createTempFile("test", ".junit", parentDir);
// don't delete tmpFile - this ensures we don't attempt to create
// a tmpDir with a duplicate name
-
+ tmpFile.delete();
File tmpDir = new File(tmpFile + ".dir");
assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
assertTrue(tmpDir.mkdirs());
@@ -395,7 +395,7 @@
return JMXEnv.conn();
}
- private static boolean recursiveDelete(File d) {
+ public static boolean recursiveDelete(File d) {
if (d.isDirectory()) {
File children[] = d.listFiles();
for (File f : children) {
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Tue Aug 25 05:45:24 2009
@@ -23,12 +23,16 @@
import java.util.ArrayList;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Before;
import org.junit.Test;
@@ -52,7 +56,7 @@
ct.tearDownAll();
qb.tearDown();
}
-
+
@Test
public void testDeleteWithChildren() throws Exception {
ct.testDeleteWithChildren();
@@ -93,13 +97,64 @@
{
ct.testClientWithWatcherObj();
}
+
+ volatile int counter = 0;
+ volatile int errors = 0;
+ @Test
+ public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException {
+ ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Leader leader = qb.s1.leader;
+ if (leader == null) leader = qb.s2.leader;
+ if (leader == null) leader = qb.s3.leader;
+ if (leader == null) leader = qb.s4.leader;
+ if (leader == null) leader = qb.s5.leader;
+ assertNotNull(leader);
+ for(int i = 0; i < 5000; i++) {
+ zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+ public void processResult(int rc, String path, Object ctx,
+ Stat stat) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ }
+ }, null);
+ }
+ ArrayList<FollowerHandler> fhs = new ArrayList<FollowerHandler>(leader.forwardingFollowers);
+ for(FollowerHandler f: fhs) {
+ f.sock.shutdownInput();
+ }
+ for(int i = 0; i < 5000; i++) {
+ zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+ public void processResult(int rc, String path, Object ctx,
+ Stat stat) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ }
+ }, null);
+ }
+ // check if all the followers are alive
+ assertTrue(qb.s1.isAlive());
+ assertTrue(qb.s2.isAlive());
+ assertTrue(qb.s3.isAlive());
+ assertTrue(qb.s4.isAlive());
+ assertTrue(qb.s5.isAlive());
+ zk.close();
+ }
+
@Test
public void testMultipleWatcherObjs() throws IOException,
InterruptedException, KeeperException
{
ct.testMutipleWatcherObjs();
}
-
+
/**
* Make sure that we can change sessions
* from follower to leader.
@@ -134,6 +189,7 @@
}
zk.close();
}
+
@Test
/**
* Connect to two different servers with two different handles using the same session and
@@ -171,6 +227,5 @@
}
zk.close();
}
-
- // skip superhammer and clientcleanup as they are too expensive for quorum
+ // skip superhammer and clientcleanup as they are too expensive for quorum
}
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java?rev=807484&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java Tue Aug 25 05:45:24 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+
+import junit.framework.TestCase;
+
+public class TruncateTest extends TestCase {
+ private static final Logger LOG = Logger.getLogger(TruncateTest.class);
+ File dataDir1, dataDir2, dataDir3;
+ final int baseHostPort = 12233;
+
+ @Before
+ public void setUp() throws IOException {
+ dataDir1 = ClientBase.createTmpDir();
+ dataDir2 = ClientBase.createTmpDir();
+ dataDir3 = ClientBase.createTmpDir();
+ }
+
+ @After
+ public void tearDown() {
+ ClientBase.recursiveDelete(dataDir1);
+ ClientBase.recursiveDelete(dataDir2);
+ ClientBase.recursiveDelete(dataDir3);
+ }
+
+ volatile boolean connected;
+ Watcher nullWatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ connected = event.getState() == Watcher.Event.KeeperState.SyncConnected;
+ }
+ };
+
+ @Test
+ public void testTruncate() throws IOException, InterruptedException, KeeperException {
+ // Prime the server that is going to come in late with 50 txns
+ NIOServerCnxn.Factory factory = ClientBase.createNewServerInstance(dataDir1, null, "127.0.0.1:" + baseHostPort, 100);
+ ZooKeeper zk = new ZooKeeper("127.0.0.1:" + baseHostPort, 15000, nullWatcher);
+ for(int i = 0; i < 50; i++) {
+ zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ factory.shutdown();
+ zk.close();
+ int tickTime = 2000;
+ int initLimit = 3;
+ int syncLimit = 3;
+ int port1 = baseHostPort+1;
+ int port2 = baseHostPort+2;
+ int port3 = baseHostPort+3;
+
+ // Start up two of the quorum and add 10 txns
+ HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+ peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1 + 1000)));
+ peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", port2 + 1000)));
+ peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3 + 1000)));
+
+ QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 0, 2, tickTime, initLimit, syncLimit);
+ s2.start();
+ QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 0, 3, tickTime, initLimit, syncLimit);
+ s3.start();
+ connected = false;
+ zk = new ZooKeeper("127.0.0.1:" + port2, 15000, nullWatcher);
+ while(!connected) {
+ Thread.sleep(1000);
+ }
+ for(int i = 0; i < 10; i++) {
+ zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ zk.close();
+
+ final ZooKeeper zk2 = new ZooKeeper("127.0.0.1:" + port2, 15000, nullWatcher);
+ zk2.getData("/9", false, new Stat());
+ try {
+ zk2.getData("/10", false, new Stat());
+ fail("Should have gotten an error");
+ } catch(KeeperException.NoNodeException e) {
+ // this is what we want
+ }
+ QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 0, 1, tickTime, initLimit, syncLimit);
+ s1.start();
+
+ connected = false;
+ ZooKeeper zk1 = new ZooKeeper("127.0.0.1:" + port1, 15000, nullWatcher);
+ while(!connected) {
+ Thread.sleep(1000);
+ }
+ zk1.getData("/9", false, new Stat());
+ try {
+ // 10 wont work because the session expiration
+ // will match the zxid for 10 and so we wont
+ // actually truncate the zxid for 10 creation
+ // but for 11 we will for sure
+ zk1.getData("/11", false, new Stat());
+ fail("Should have gotten an error");
+ } catch(KeeperException.NoNodeException e) {
+ // this is what we want
+ }
+ zk1.close();
+ s1.shutdown();
+ s1.join();
+ s2.shutdown();
+ s2.join();
+ s3.shutdown();
+ s3.join();
+ }
+}