You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/08/25 07:45:25 UTC

svn commit: r807484 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/persistence/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: mahadev
Date: Tue Aug 25 05:45:24 2009
New Revision: 807484

URL: http://svn.apache.org/viewvc?rev=807484&view=rev
Log:
ZOOKEEPER-508. proposals and commits for DIFF and Truncate messages from the leader to the followers is buggy. (mahadev and ben via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Aug 25 05:45:24 2009
@@ -65,6 +65,9 @@
   ZOOKEEPER-498. Unending Leader Elections : WAN configuration (flavio via
   mahadev)
 
+  ZOOKEEPER-508. proposals and commits for DIFF and Truncate messages from the
+  leader to the followers is buggy. (mahadev and ben via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
   "socket reuse" and failure to close client (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Tue Aug 25 05:45:24 2009
@@ -24,6 +24,7 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
@@ -271,8 +272,8 @@
      */
     public boolean truncate(long zxid) throws IOException {
         FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid);
-        FileInputStream input = itr.inputStream;
-        long pos = input.getChannel().position();
+        PositionInputStream input = itr.inputStream;
+        long pos = input.getPosition();
         // now, truncate at the current position
         RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
         raf.setLength(pos);
@@ -322,6 +323,48 @@
     }
 
     /**
+     * a class that keeps track of the position 
+     * in the input stream. The position points to offset
+     * that has been consumed by the applications. It can 
+     * wrap buffered input streams to provide the right offset 
+     * for the application.
+     */
+    static class PositionInputStream extends FilterInputStream {
+        long position;
+        protected PositionInputStream(InputStream in) {
+            super(in);
+        }
+        
+        @Override
+        public int read() throws IOException {
+            int rc = super.read();
+            if (rc > 0) {
+                position++;
+            }
+            return rc;
+        }
+        
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            int rc = super.read(b, off, len);
+            position += rc;
+            return rc;
+        }
+        
+        @Override
+        public long skip(long n) throws IOException {
+            long rc = super.skip(n);
+            if (rc > 0) {
+                position += rc;
+            }
+            return rc;
+        }
+        public long getPosition() {
+            return position;
+        }
+    }
+    
+    /**
      * this class implements the txnlog iterator interface
      * which is used for reading the transaction logs
      */
@@ -333,7 +376,8 @@
         File logFile;
         InputArchive ia;
         static final String CRC_ERROR="CRC check failed";
-        FileInputStream inputStream=null;
+       
+        PositionInputStream inputStream=null;
         //stored files is the list of files greater than
         //the zxid we are looking for.
         private ArrayList<File> storedFiles;
@@ -398,7 +442,7 @@
          * @param is the inputstream
          * @throws IOException
          */
-        protected void inStreamCreated(InputArchive ia, FileInputStream is)
+        protected void inStreamCreated(InputArchive ia, InputStream is)
             throws IOException{
             FileHeader header= new FileHeader();
             header.deserialize(ia, "fileheader");
@@ -416,9 +460,9 @@
          **/
         protected InputArchive createInputArchive(File logFile) throws IOException {
             if(inputStream==null){
-                inputStream= new FileInputStream(logFile);
+                inputStream= new PositionInputStream(new BufferedInputStream(new FileInputStream(logFile)));
                 LOG.debug("Created new input stream " + logFile);
-                ia  = BinaryInputArchive.getArchive(new BufferedInputStream(inputStream));
+                ia  = BinaryInputArchive.getArchive(inputStream);
                 inStreamCreated(ia,inputStream);
                 LOG.debug("created new input archive " + logFile);
             }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Aug 25 05:45:24 2009
@@ -200,7 +200,7 @@
                 readPacket(qp);
                 synchronized (zk) {
                     if (qp.getType() == Leader.DIFF) {
-                        LOG.info("Getting a diff from the leader!");
+                        LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
                         zk.loadData();
                     }
                     else if (qp.getType() == Leader.SNAP) {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java Tue Aug 25 05:45:24 2009
@@ -62,7 +62,7 @@
     
     long getSid(){
         return sid;
-    }
+    }                    
 
     /**
      * The packets to be sent to the follower
@@ -224,15 +224,23 @@
             } else {
             	this.sid = leader.followerCounter.getAndDecrement();
             }
-            LOG.info("The follower sid: " + this.sid);
+            LOG.info("Follower sid: " + this.sid + " : info : "
+                    + leader.self.quorumPeers.get(this.sid));
             
+            /* this is the last zxid from the follower but the leader might have to
+              restart the follower from a different zxid depending on truncate and diff. */
             long peerLastZxid = qp.getZxid();
-            
+            /* the default to send to the follower */
             int packetToSend = Leader.SNAP;
             boolean logTxns = true;
-
             long zxidToSend = 0;
-            // we are sending the diff
+            
+            /** the packets that the follower needs to get updates from **/
+            long updates = peerLastZxid;
+            
+            /* we are sending the diff check if we have proposals in memory to be able to 
+             * send a diff to the 
+             */ 
             synchronized(leader.zk.committedLog) {
                 if (leader.zk.committedLog.size() != 0) {
                     if ((leader.zk.maxCommittedLog >= peerLastZxid)
@@ -252,16 +260,7 @@
                 }
                 else {
                     logTxns = false;
-                }            }
-            long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
-            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                    leaderLastZxid, null, null);
-            oa.writeRecord(newLeaderQP, "packet");
-            bufferedOutput.flush();
-            // a special case when both the ids are the same
-            if (peerLastZxid == leaderLastZxid) {
-                packetToSend = Leader.DIFF;
-                zxidToSend = leaderLastZxid;
+                }            
             }
             //check if we decided to send a diff or we need to send a truncate
             // we avoid using epochs for truncating because epochs make things
@@ -274,11 +273,31 @@
                 // we can ask the follower to truncate the log
                 packetToSend = Leader.TRUNC;
                 zxidToSend = leader.zk.maxCommittedLog;
-
+                updates = zxidToSend;
             }
+            
+            /* see what other packets from the proposal
+             * and tobeapplied queues need to be sent
+             * and then decide if we can just send a DIFF
+             * or we actually need to send the whole snapshot
+             */
+            long leaderLastZxid = leader.startForwarding(this, updates);
+            // a special case when both the ids are the same 
+            if (peerLastZxid == leaderLastZxid) {
+                packetToSend = Leader.DIFF;
+                zxidToSend = leaderLastZxid;
+            }
+
+            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                    leaderLastZxid, null, null);
+            oa.writeRecord(newLeaderQP, "packet");
+            bufferedOutput.flush();
+            
+           
             oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
             bufferedOutput.flush();
-            // only if we are not truncating or fast sycning
+            
+            /* if we are not truncating or sending a diff just send a snapshot */
             if (packetToSend == Leader.SNAP) {
                 LOG.warn("Sending snapshot last zxid of peer is 0x"
                         + Long.toHexString(peerLastZxid) + " " 
@@ -289,7 +308,7 @@
                 oa.writeString("BenWasHere", "signature");
             }
             bufferedOutput.flush();
-            //
+            
             // Mutation packets will be queued during the serialize,
             // so we need to mark when the follower can actually start
             // using the data

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Tue Aug 25 05:45:24 2009
@@ -647,8 +647,7 @@
                 }
                 handler.queuePacket(p.packet);
                 // Since the proposal has been committed we need to send the
-                // commit message
-                // also
+                // commit message also
                 QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet
                         .getZxid(), null, null);
                 handler.queuePacket(qp);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java Tue Aug 25 05:45:24 2009
@@ -43,13 +43,33 @@
             try {
                 follower.writePacket(qp, false);
             } catch (IOException e) {
-                LOG.warn("Ignoring unexpected exception during packet send", e);
+                LOG.warn("Closing connection to leader, exception during packet send", e);
+                try {
+                    if (!follower.sock.isClosed()) {
+                        follower.sock.close();
+                    }
+                } catch (IOException e1) {
+                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
+                    LOG.debug("Ignoring error closing the connection", e1);
+                }
             }
         }
     }
     
     public void flush() throws IOException {
-        follower.writePacket(null, true);
+        try {
+            follower.writePacket(null, true);
+        } catch(IOException e) {
+            LOG.warn("Closing connection to leader, exception during packet send", e);
+            try {
+                if (!follower.sock.isClosed()) {
+                    follower.sock.close();
+                }
+            } catch (IOException e1) {
+                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
+                    LOG.debug("Ignoring error closing the connection", e1);
+            }
+        }
     }
 
     public void shutdown() {

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Aug 25 05:45:24 2009
@@ -261,7 +261,7 @@
         File tmpFile = File.createTempFile("test", ".junit", parentDir);
         // don't delete tmpFile - this ensures we don't attempt to create
         // a tmpDir with a duplicate name
-
+        tmpFile.delete();
         File tmpDir = new File(tmpFile + ".dir");
         assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
         assertTrue(tmpDir.mkdirs());
@@ -395,7 +395,7 @@
         return JMXEnv.conn();
     }
 
-    private static boolean recursiveDelete(File d) {
+    public static boolean recursiveDelete(File d) {
         if (d.isDirectory()) {
             File children[] = d.listFiles();
             for (File f : children) {

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=807484&r1=807483&r2=807484&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Tue Aug 25 05:45:24 2009
@@ -23,12 +23,16 @@
 import java.util.ArrayList;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,7 +56,7 @@
         ct.tearDownAll();
         qb.tearDown();
     }
-
+    
     @Test
     public void testDeleteWithChildren() throws Exception {
         ct.testDeleteWithChildren();
@@ -93,13 +97,64 @@
     {
         ct.testClientWithWatcherObj();
     }
+    
+    volatile int counter = 0;
+    volatile int errors = 0;
+    @Test
+    public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException {
+        ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+            public void process(WatchedEvent event) {
+        }});
+        zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Leader leader = qb.s1.leader;
+        if (leader == null) leader = qb.s2.leader;
+        if (leader == null) leader = qb.s3.leader;
+        if (leader == null) leader = qb.s4.leader;
+        if (leader == null) leader = qb.s5.leader;
+        assertNotNull(leader);
+        for(int i = 0; i < 5000; i++) {
+            zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                }
+            }, null);
+        }
+        ArrayList<FollowerHandler> fhs = new ArrayList<FollowerHandler>(leader.forwardingFollowers);
+        for(FollowerHandler f: fhs) {
+            f.sock.shutdownInput();
+        }
+        for(int i = 0; i < 5000; i++) {
+            zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                }
+            }, null);
+        }
+        // check if all the followers are alive
+        assertTrue(qb.s1.isAlive());
+        assertTrue(qb.s2.isAlive());
+        assertTrue(qb.s3.isAlive());
+        assertTrue(qb.s4.isAlive());
+        assertTrue(qb.s5.isAlive());
+        zk.close();
+    }
+    
     @Test
     public void testMultipleWatcherObjs() throws IOException,
             InterruptedException, KeeperException
     {
         ct.testMutipleWatcherObjs();
     }
-
+	
     /**
      * Make sure that we can change sessions 
      *  from follower to leader.
@@ -134,6 +189,7 @@
         }
         zk.close();
     }
+    
     @Test
     /**
      * Connect to two different servers with two different handles using the same session and
@@ -171,6 +227,5 @@
         }
         zk.close();
     }
-
-    // skip superhammer and clientcleanup as they are too expensive for quorum
+	// skip superhammer and clientcleanup as they are too expensive for quorum
 }

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java?rev=807484&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java Tue Aug 25 05:45:24 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+
+import junit.framework.TestCase;
+
+public class TruncateTest extends TestCase {
+	private static final Logger LOG = Logger.getLogger(TruncateTest.class);
+    File dataDir1, dataDir2, dataDir3;
+    final int baseHostPort = 12233;
+    
+    @Before
+    public void setUp() throws IOException {
+        dataDir1 = ClientBase.createTmpDir();
+        dataDir2 = ClientBase.createTmpDir();
+        dataDir3 = ClientBase.createTmpDir();
+    }
+    
+    @After
+    public void tearDown() {
+        ClientBase.recursiveDelete(dataDir1);
+        ClientBase.recursiveDelete(dataDir2);
+        ClientBase.recursiveDelete(dataDir3);
+    }
+    
+    volatile boolean connected;
+    Watcher nullWatcher = new Watcher() {
+        @Override
+        public void process(WatchedEvent event) {
+            connected = event.getState() == Watcher.Event.KeeperState.SyncConnected;
+        }
+    };
+    
+    @Test
+    public void testTruncate() throws IOException, InterruptedException, KeeperException {
+        // Prime the server that is going to come in late with 50 txns
+        NIOServerCnxn.Factory factory = ClientBase.createNewServerInstance(dataDir1, null, "127.0.0.1:" + baseHostPort, 100);
+        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + baseHostPort, 15000, nullWatcher);
+        for(int i = 0; i < 50; i++) {
+            zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        factory.shutdown();
+        zk.close();
+        int tickTime = 2000;
+        int initLimit = 3;
+        int syncLimit = 3;
+        int port1 = baseHostPort+1;
+        int port2 = baseHostPort+2;
+        int port3 = baseHostPort+3;
+        
+        // Start up two of the quorum and add 10 txns
+        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+        peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1 + 1000)));
+        peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", port2 + 1000)));
+        peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3 + 1000)));
+
+        QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 0, 2, tickTime, initLimit, syncLimit);
+        s2.start();
+        QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 0, 3, tickTime, initLimit, syncLimit);
+        s3.start();
+        connected = false;
+        zk = new ZooKeeper("127.0.0.1:" + port2, 15000, nullWatcher);
+        while(!connected) {
+            Thread.sleep(1000);
+        }
+        for(int i = 0; i < 10; i++) {
+            zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+        
+        final ZooKeeper zk2 = new ZooKeeper("127.0.0.1:" + port2, 15000, nullWatcher);
+        zk2.getData("/9", false, new Stat());
+        try {
+            zk2.getData("/10", false, new Stat());
+            fail("Should have gotten an error");
+        } catch(KeeperException.NoNodeException e) {
+            // this is what we want
+        }
+        QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 0, 1, tickTime, initLimit, syncLimit);
+        s1.start();
+
+        connected = false;
+        ZooKeeper zk1 = new ZooKeeper("127.0.0.1:" + port1, 15000, nullWatcher);
+        while(!connected) {
+            Thread.sleep(1000);
+        }
+        zk1.getData("/9", false, new Stat());
+        try {
+        	// 10 wont work because the session expiration
+        	// will match the zxid for 10 and so we wont
+        	// actually truncate the zxid for 10 creation
+        	// but for 11 we will for sure
+        	zk1.getData("/11", false, new Stat());
+            fail("Should have gotten an error");
+        } catch(KeeperException.NoNodeException e) {
+            // this is what we want
+        }
+        zk1.close();
+        s1.shutdown();
+        s1.join();
+        s2.shutdown();
+        s2.join();
+        s3.shutdown();
+        s3.join();
+    }
+}