You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/07/17 23:28:58 UTC

svn commit: r1362664 - in /zookeeper/branches/branch-3.3: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/persistence/ src/java/test/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/server/quorum/ ...

Author: phunt
Date: Tue Jul 17 21:28:57 2012
New Revision: 1362664

URL: http://svn.apache.org/viewvc?rev=1362664&view=rev
Log:
ZOOKEEPER-1489. Data loss after truncate on transaction log (phunt)

Added:
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java
Modified:
    zookeeper/branches/branch-3.3/CHANGES.txt
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java

Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Tue Jul 17 21:28:57 2012
@@ -28,6 +28,8 @@ BUGFIXES:
   ZOOKEEPER-1163. Memory leak in zk_hashtable.c:do_insert_watcher_object()
   (Anupam Chanda via michim)
 
+  ZOOKEEPER-1489. Data loss after truncate on transaction log (phunt)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1454. Document how to run autoreconf if cppunit is

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Tue Jul 17 21:28:57 2012
@@ -425,16 +425,23 @@ public class ZKDatabase {
     }
 
     /**
-     * truncate the zkdatabase to this zxid
+     * Truncate the ZKDatabase to the specified zxid
      * @param zxid the zxid to truncate zk database to
-     * @return true if the truncate is succesful and false if not
+     * @return true if the truncate is successful and false if not
      * @throws IOException
      */
     public boolean truncateLog(long zxid) throws IOException {
         clear();
-        boolean truncated = this.snapLog.truncateLog(zxid);
+
+        // truncate the log
+        boolean truncated = snapLog.truncateLog(zxid);
+
+        if (!truncated) {
+            return false;
+        }
+
         loadDataBase();
-        return truncated;
+        return true;
     }
     
     /**
@@ -491,4 +498,4 @@ public class ZKDatabase {
         this.snapLog.close();
     }
     
-}
\ No newline at end of file
+}

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Tue Jul 17 21:28:57 2012
@@ -48,12 +48,12 @@ import org.apache.zookeeper.KeeperExcept
 public class FileTxnSnapLog {
     //the direcotry containing the 
     //the transaction logs
-    File dataDir; 
-    //the directory containing the 
+    private final File dataDir;
+    //the directory containing the
     //the snapshot directory
-    File snapDir;
-    TxnLog txnLog;
-    SnapShot snapLog;
+    private final File snapDir;
+    private TxnLog txnLog;
+    private SnapShot snapLog;
     public final static int VERSION = 2;
     public final static String version = "version-";
     
@@ -77,6 +77,8 @@ public class FileTxnSnapLog {
      * @param snapDir the snapshot directory
      */
     public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
+        LOG.debug("Opening datadir:" + dataDir + " snapDir:{}" + snapDir);
+
         this.dataDir = new File(dataDir, version + VERSION);
         this.snapDir = new File(snapDir, version + VERSION);
         if (!this.dataDir.exists()) {
@@ -266,8 +268,22 @@ public class FileTxnSnapLog {
      * @throws IOException
      */
     public boolean truncateLog(long zxid) throws IOException {
-        FileTxnLog txnLog = new FileTxnLog(dataDir);
-        return txnLog.truncate(zxid);
+        // close the existing txnLog and snapLog
+        close();
+
+        // truncate it
+        FileTxnLog truncLog = new FileTxnLog(dataDir);
+        boolean truncated = truncLog.truncate(zxid);
+        truncLog.close();
+
+        // re-open the txnLog and snapLog
+        // I'd rather just close/reopen this object itself, however that 
+        // would have a big impact outside ZKDatabase as there are other
+        // objects holding a reference to this object.
+        txnLog = new FileTxnLog(dataDir);
+        snapLog = new FileSnap(snapDir);
+
+        return truncated;
     }
     
     /**

Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java?rev=1362664&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java (added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java Tue Jul 17 21:28:57 2012
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.client.FourLetterWordMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.server.util.PortForwarder;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify ZOOKEEPER-1489 - cause truncation followed by continued append to the
+ * snaplog, verify that the newly appended information (after the truncation) is
+ * readable.
+ */
+public class TruncateCorruptionTest extends TestCase {
+
+    private static final Logger LOG =
+            Logger.getLogger(TruncateCorruptionTest.class);
+
+    public interface Check {
+        boolean doCheck();
+    }
+
+    public static boolean await(Check check, long timeoutMillis)
+            throws InterruptedException {
+        long end = System.currentTimeMillis() + timeoutMillis;
+        while (end > System.currentTimeMillis()) {
+            if (check.doCheck()) {
+                LOG.debug("await succeeded after "
+                        + (System.currentTimeMillis() - end + timeoutMillis));
+                return true;
+            }
+            Thread.sleep(50);
+        }
+        LOG.debug("await failed in " + timeoutMillis);
+        return false;
+    }
+
+    @Test
+    public void testTransactionLogCorruption() throws Exception {
+        // configure the ports for that test in a way so that we can disrupt the
+        // connection for wrapper1
+        ZookeeperServerWrapper wrapper1 = new ZookeeperServerWrapper(1, 7000);
+        ZookeeperServerWrapper wrapper2 = new ZookeeperServerWrapper(2, 8000);
+        ZookeeperServerWrapper wrapper3 = new ZookeeperServerWrapper(3, 8000);
+
+        wrapper2.start();
+        wrapper3.start();
+
+        wrapper2.await(ClientBase.CONNECTION_TIMEOUT);
+        wrapper3.await(ClientBase.CONNECTION_TIMEOUT);
+        List<PortForwarder> pfs = startForwarding();
+        Thread.sleep(1000);
+        wrapper1.start();
+        wrapper1.await(ClientBase.CONNECTION_TIMEOUT);
+
+        final ZooKeeper zk1 = new ZooKeeper("localhost:8201",
+                ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk1"));
+        waitForConnection(zk1);
+        zk1.create("/test", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // wait a little until stuff is synced in between servers
+        Thread.sleep(1000);
+        wrapper2.stop();
+        // wait for reconnect
+        waitForConnection(zk1);
+        zk1.create("/test2", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        // now we stop them to force a situation where a TRUNC event is sent to
+        // the followers
+        wrapper3.stop();
+        // simulate a short interruption in network in between
+
+        stopForwarding(pfs);
+        LOG.info("interrupted network connection ... waiting for zk1 and zk2 to realize");
+
+        Assert.assertTrue(await(new Check() {
+
+            public boolean doCheck() {
+                if (zk1.getState() == States.CONNECTING) {
+                    List<String> children;
+                    try {
+                        children = zk1.getChildren("/", false);
+
+                        return children.size() != 0;
+                    } catch (KeeperException.ConnectionLossException e) {
+                        // just to be sure
+                        return true;
+                    } catch (Exception e) {
+                        // silently fail
+                    }
+                }
+                return false;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        // let's clean the data dir of zk3 so that an ensemble of 2 and 3 is
+        // less advanced than 1 (just to force an event where we get a TRUNCATE
+        // message)
+        wrapper3.clean();
+        wrapper2.start();
+        wrapper3.start();
+        LOG.info("Waiting for zk2 and zk3 to form a quorum");
+
+        wrapper2.await(ClientBase.CONNECTION_TIMEOUT);
+        wrapper3.await(ClientBase.CONNECTION_TIMEOUT);
+        ZooKeeper zk2 = new ZooKeeper("localhost:8202",
+                ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk2"));
+        waitForConnection(zk2);
+
+        LOG.info("re-establishing network connection and waiting for zk1 to reconnect");
+        pfs = startForwarding();
+        waitForConnection(zk1);
+
+        // create more data ...
+        LOG.info("Creating node test3");
+        zk1.create("/test3", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        Thread.sleep(250);
+        LOG.info("List of children at zk2 before zk1 became master");
+        List<String> children2 = zk2.getChildren("/", false);
+        LOG.info(children2.toString());
+
+        LOG.info("List of children at zk1 before zk1 became master");
+        List<String> children1 = zk1.getChildren("/", false);
+        LOG.info(children1.toString());
+
+        // now cause zk1 to become master and test3 will be lost
+        LOG.info("restarting zk2 and zk3 while cleaning zk3 to enforce zk1 to become master");
+        wrapper2.stop();
+        wrapper3.stop();
+        wrapper3.clean();
+        wrapper3.start();
+        wrapper3.await(TimeUnit.MINUTES.toMillis(2));
+        ZooKeeper zk3 = new ZooKeeper("localhost:8203",
+                ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk3"));
+        waitForConnection(zk3);
+        LOG.info("Zk1 and zk3 have a quorum, now starting zk2");
+        wrapper2.start();
+        waitForConnection(zk2);
+        LOG.info("List of children at zk2");
+        children2 = zk2.getChildren("/", false);
+        LOG.info(children2.toString());
+
+        waitForConnection(zk1);
+        LOG.info("List of children at zk1");
+        children1 = zk1.getChildren("/", false);
+        Assert.assertTrue("test3 node is missing on zk1",
+                children1.contains("test3"));
+        Assert.assertTrue("test3 node is missing on zk2",
+                children2.contains("test3"));
+        Assert.assertEquals(children1, children2);
+        stopForwarding(pfs);
+    }
+
+    /**
+     * @param pfs
+     * @throws Exception
+     */
+    private void stopForwarding(List<PortForwarder> pfs) throws Exception {
+        for (PortForwarder pf : pfs) {
+            pf.shutdown();
+        }
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    private List<PortForwarder> startForwarding() throws IOException {
+        List<PortForwarder> res = new ArrayList<PortForwarder>();
+        res.add(new PortForwarder(8301, 7301));
+        res.add(new PortForwarder(8401, 7401));
+        res.add(new PortForwarder(7302, 8302));
+        res.add(new PortForwarder(7402, 8402));
+        res.add(new PortForwarder(7303, 8303));
+        res.add(new PortForwarder(7403, 8403));
+        return res;
+    }
+
+    /**
+     * @param zk
+     * @throws InterruptedException
+     */
+    private void waitForConnection(final ZooKeeper zk)
+            throws InterruptedException {
+        Assert.assertTrue(await(new Check() {
+
+            public boolean doCheck() {
+                if (zk.getState() == States.CONNECTED) {
+                    List<String> children;
+                    try {
+                        children = zk.getChildren("/", false);
+
+                        return children.size() != 0;
+                    } catch (Exception e) {
+                        // silently fail
+                    }
+                }
+                return false;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+    }
+
+    static class ZkWatcher implements Watcher {
+
+        private final String clientId;
+
+        ZkWatcher(String clientId) {
+            this.clientId = clientId;
+        }
+
+        public void process(WatchedEvent event) {
+            LOG.info("<<<EVENT>>> " + clientId + " - WatchedEvent: "
+                    + event);
+        }
+    }
+
+    public static class ZookeeperServerWrapper {
+
+        private static final Logger LOG = 
+                Logger.getLogger(ZookeeperServerWrapper.class);
+
+        private final MainThread server;
+        private final int clientPort;
+
+        public ZookeeperServerWrapper(int serverId, int portBase)
+                throws IOException {
+            clientPort = 8200 + serverId;
+
+            // start client port on 8200 + serverId
+            // start servers on portbase + 300 or + 400 (+serverId)
+            String quorumCfgSection = "server.1=127.0.0.1:" + (portBase + 301)
+                    + ":" + (portBase + 401)
+                    + "\nserver.2=127.0.0.1:" + (portBase + 302) + ":"
+                    + (portBase + 402)
+                    + "\nserver.3=127.0.0.1:" + (portBase + 303) + ":"
+                    + (portBase + 403);
+
+            server = new MainThread(serverId, clientPort, quorumCfgSection);
+        }
+
+        public void start() throws Exception {
+            server.start();
+        }
+
+        public void await(long timeout) throws Exception {
+            long deadline = System.currentTimeMillis() + timeout;
+            String result = "?";
+            while (deadline > System.currentTimeMillis()) {
+                try {
+                    result = FourLetterWordMain.send4LetterWord("127.0.0.1",
+                            clientPort, "stat");
+                    if (result.startsWith("Zookeeper version:")) {
+                        LOG.info("Started zookeeper server on port "
+                                 + clientPort);
+                        return;
+                    }
+                } catch (IOException e) {
+                    // ignore as this is expected
+                }
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            LOG.info(result);
+            throw new Exception("Failed to connect to zookeeper server");
+        }
+
+        public void stop() {
+            try {
+                server.shutdown();
+            } catch (InterruptedException e) {
+                LOG.info("Interrupted while shutting down");
+            }
+        }
+
+        public void clean() throws IOException {
+            server.clean();
+        }
+    }
+}

Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Tue Jul 17 21:28:57 2012
@@ -34,8 +34,8 @@ import org.apache.zookeeper.test.ClientB
 import org.apache.zookeeper.test.QuorumBase;
 
 /**
- * Has some common functionality for tests that work with QuorumPeers.
- * Override process(WatchedEvent) to implement the Watcher interface
+ * Has some common functionality for tests that work with QuorumPeers. Override
+ * process(WatchedEvent) to implement the Watcher interface
  */
 public class QuorumPeerTestBase extends TestCase implements Watcher {
     protected static final Logger LOG =
@@ -45,7 +45,7 @@ public class QuorumPeerTestBase extends 
         // ignore for this test
     }
 
-    public static  class TestQPMain extends QuorumPeerMain {
+    public static class TestQPMain extends QuorumPeerMain {
         public void shutdown() {
             // ensure it closes - in particular wait for thread to exit
             if (quorumPeer != null) {
@@ -54,16 +54,16 @@ public class QuorumPeerTestBase extends 
         }
     }
 
-    public static class MainThread extends Thread {
+    public static class MainThread implements Runnable {
         final File confFile;
         volatile TestQPMain main;
 
         public MainThread(int myid, int clientPort, String quorumCfgSection)
             throws IOException
         {
-            super("QuorumPeer with myid:" + myid
-                    + " and clientPort:" + clientPort);
             File tmpDir = ClientBase.createTmpDir();
+            LOG.info("id = " + myid + " tmpDir = " + tmpDir + " clientPort = "
+                    + clientPort);
             confFile = new File(tmpDir, "zoo.cfg");
 
             FileWriter fwriter = new FileWriter(confFile);
@@ -99,11 +99,13 @@ public class QuorumPeerTestBase extends 
         }
 
         Thread currentThread;
+
         synchronized public void start() {
             main = new TestQPMain();
             currentThread = new Thread(this);
             currentThread.start();
         }
+
         public void run() {
             String args[] = new String[1];
             args[0] = confFile.toString();
@@ -125,5 +127,21 @@ public class QuorumPeerTestBase extends 
             }
         }
 
+        public void join(long timeout) throws InterruptedException {
+            Thread t = currentThread;
+            if (t != null) {
+                t.join(timeout);
+            }
+        }
+
+        public boolean isAlive() {
+            Thread t = currentThread;
+            return t != null && t.isAlive();
+        }
+
+        public void clean() {
+            ClientBase.recursiveDelete(main.quorumPeer.getTxnFactory()
+                    .getDataDir());
+        }
     }
 }

Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java?rev=1362664&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java (added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java Tue Jul 17 21:28:57 2012
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.zookeeper.server.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+public class PortForwarder extends Thread {
+    private static final Logger LOG = Logger.getLogger(PortForwarder.class);
+
+    private static class PortForwardWorker implements Runnable {
+
+        private final InputStream in;
+        private final OutputStream out;
+        private final Socket toClose;
+        private final Socket toClose2;
+
+        PortForwardWorker(Socket toClose, Socket toClose2, InputStream in,
+                OutputStream out) throws IOException {
+            this.toClose = toClose;
+            this.toClose2 = toClose2;
+            this.in = in;
+            this.out = out;
+            // LOG.info("starting forward for "+toClose);
+        }
+
+        public void run() {
+            Thread.currentThread().setName(toClose.toString() + "-->"
+                    + toClose2.toString());
+            byte[] buf = new byte[1024];
+            try {
+                while (true) {
+                    try {
+                        int read = this.in.read(buf);
+                        if (read > 0) {
+                            try {
+                                this.out.write(buf, 0, read);
+                            } catch (IOException e) {
+                                LOG.warn("exception during write", e);
+                                try {
+                                    toClose.close();
+                                } catch (IOException ex) {
+                                    // ignore
+                                }
+                                try {
+                                    toClose2.close();
+                                } catch (IOException ex) {
+                                    // ignore
+                                }
+                                break;
+                            }
+                        }
+                    } catch (SocketTimeoutException e) {
+                        LOG.error("socket timeout", e);
+                    }
+                    Thread.sleep(1);
+                }
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted", e);
+                try {
+                    toClose.close();
+                } catch (IOException ex) {
+                    // ignore
+                }
+                try {
+                    toClose2.close();
+                } catch (IOException ex) {
+                    // ignore silently
+                }
+            } catch (SocketException e) {
+                if (!"Socket closed".equals(e.getMessage())) {
+                    LOG.error("Unexpected exception", e);
+                }
+            } catch (IOException e) {
+                LOG.error("Unexpected exception", e);
+            }
+            LOG.info("Shutting down forward for " + toClose);
+        }
+
+    }
+
+    private volatile boolean stopped = false;
+    private ExecutorService workers = Executors.newCachedThreadPool();
+    private ServerSocket serverSocket;
+    private final int to;
+
+    public PortForwarder(int from, int to) throws IOException {
+        this.to = to;
+        serverSocket = new ServerSocket(from);
+        serverSocket.setSoTimeout(30000);
+        this.start();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (!stopped) {
+                Socket sock = null;
+                try {
+                    LOG.info("accepting socket local:"
+                            + serverSocket.getLocalPort() + " to:" + to);
+                    sock = serverSocket.accept();
+                    LOG.info("accepted: local:" + sock.getLocalPort()
+                            + " from:" + sock.getPort()
+                            + " to:" + to);
+                    Socket target = null;
+                    int retry = 10;
+                    while(sock.isConnected()) {
+                        try {
+                            target = new Socket("localhost", to);
+                            break;
+                        } catch (IOException e) {
+                            if (retry == 0) {
+                               throw e;
+                            }
+                            LOG.warn("connection failed, retrying(" + retry
+                                    + "): local:" + sock.getLocalPort()
+                                    + " from:" + sock.getPort()
+                                    + " to:" + to, e);
+                        }
+                        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+                        retry--;
+                    }
+                    LOG.info("connected: local:" + sock.getLocalPort()
+                            + " from:" + sock.getPort()
+                            + " to:" + to);
+                    sock.setSoTimeout(30000);
+                    target.setSoTimeout(30000);
+                    this.workers.execute(new PortForwardWorker(sock, target,
+                            sock.getInputStream(), target.getOutputStream()));
+                    this.workers.execute(new PortForwardWorker(target, sock,
+                            target.getInputStream(), sock.getOutputStream()));
+                } catch (SocketTimeoutException e) {
+                    LOG.warn("socket timed out local:" + sock.getLocalPort()
+                            + " from:" + sock.getPort()
+                            + " to:" + to, e);
+                } catch (ConnectException e) {
+                    LOG.warn("connection exception local:" + sock.getLocalPort()
+                            + " from:" + sock.getPort()
+                            + " to:" + to, e);
+                    sock.close();
+                } catch (IOException e) {
+                    if (!"Socket closed".equals(e.getMessage())) {
+                        LOG.warn("unexpected exception local:" + sock.getLocalPort()
+                            + " from:" + sock.getPort()
+                            + " to:" + to, e);
+                        throw e;
+                    }
+                }
+
+            }
+        } catch (IOException e) {
+            LOG.error("Unexpected exception to:" + to, e);
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted to:" + to, e);
+        }
+    }
+
+    public void shutdown() throws Exception {
+        this.stopped = true;
+        this.serverSocket.close();
+        this.workers.shutdownNow();
+        try {
+            if (!this.workers.awaitTermination(5, TimeUnit.SECONDS)) {
+                throw new Exception(
+                        "Failed to stop forwarding within 5 seconds");
+            }
+        } catch (InterruptedException e) {
+            throw new Exception("Failed to stop forwarding");
+        }
+        this.join();
+    }
+}

Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java Tue Jul 17 21:28:57 2012
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.jute.Record;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -34,9 +36,15 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,12 +75,67 @@ public class TruncateTest extends TestCa
             connected = event.getState() == Watcher.Event.KeeperState.SyncConnected;
         }
     };
-    
+
+    @Test
+    public void testTruncationStreamReset() throws Exception {
+        File tmpdir = ClientBase.createTmpDir();
+        FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
+        ZKDatabase zkdb = new ZKDatabase(snaplog);
+
+        for (int i = 1; i <= 100; i++) {
+            append(zkdb, i);
+        }
+
+        zkdb.truncateLog(1);
+
+        append(zkdb, 200);
+
+        zkdb.close();
+
+        // verify that the truncation and subsequent append were processed
+        // correctly
+        FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
+        TxnIterator iter = txnlog.read(1);
+
+        TxnHeader hdr = iter.getHeader();
+        Record txn = iter.getTxn();
+        Assert.assertEquals(1, hdr.getZxid());
+        Assert.assertTrue(txn instanceof SetDataTxn);
+
+        iter.next();
+
+        hdr = iter.getHeader();
+        txn = iter.getTxn();
+        Assert.assertEquals(200, hdr.getZxid());
+        Assert.assertTrue(txn instanceof SetDataTxn);
+    }
+
+    private void append(ZKDatabase zkdb, int i) throws IOException {
+        TxnHeader hdr = new TxnHeader(1, 1, i, 1, ZooDefs.OpCode.setData);
+        Record txn = new SetDataTxn("/foo" + i, new byte[0], 1);
+        Request req = new Request(null, 0, 0, 0, null, null);
+        req.hdr = hdr;
+        req.txn = txn;
+
+        zkdb.append(req);
+        zkdb.commit();
+    }
+
     @Test
     public void testTruncate() throws IOException, InterruptedException, KeeperException {
         // Prime the server that is going to come in late with 50 txns
-        NIOServerCnxn.Factory factory = ClientBase.createNewServerInstance(dataDir1, null, "127.0.0.1:" + baseHostPort, 100);
-        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + baseHostPort, 15000, nullWatcher);
+        String hostPort = "127.0.0.1:" + baseHostPort;
+        NIOServerCnxn.Factory factory = ClientBase.createNewServerInstance(dataDir1, null, hostPort, 100);
+        ClientBase.shutdownServerInstance(factory, hostPort);
+
+        // standalone starts with 0 epoch while quorum starts with 1
+        File origfile = new File(new File(dataDir1, "version-2"), "snapshot.0");
+        File newfile = new File(new File(dataDir1, "version-2"), "snapshot.100000000");
+        origfile.renameTo(newfile);
+
+        factory = ClientBase.createNewServerInstance(dataDir1, null, hostPort, 100);
+
+        ZooKeeper zk = new ZooKeeper(hostPort, 15000, nullWatcher);
         for(int i = 0; i < 50; i++) {
             zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         }
@@ -129,12 +192,14 @@ public class TruncateTest extends TestCa
         }
         zk1.getData("/9", false, new Stat());
         try {
-        	// 10 wont work because the session expiration
-        	// will match the zxid for 10 and so we wont
-        	// actually truncate the zxid for 10 creation
-        	// but for 11 we will for sure
-        	zk1.getData("/11", false, new Stat());
-            fail("Should have gotten an error");
+            // /10 wont work because the session expiration
+            // will match the zxid for /10 and so we wont
+            // actually truncate the zxid for /10 creation
+            // due to an artifact of switching the xid of the standalone
+            // /11 is the last entry in the log for the xid
+            // as a result /12 is the first of the truncated znodes to check for
+            zk1.getData("/12", false, new Stat());
+            Assert.fail("Should have gotten an error");
         } catch(KeeperException.NoNodeException e) {
             // this is what we want
         }