You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/07/17 23:28:58 UTC
svn commit: r1362664 - in /zookeeper/branches/branch-3.3: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/persistence/
src/java/test/org/apache/zookeeper/server/
src/java/test/org/apache/zookeeper/server/quorum/ ...
Author: phunt
Date: Tue Jul 17 21:28:57 2012
New Revision: 1362664
URL: http://svn.apache.org/viewvc?rev=1362664&view=rev
Log:
ZOOKEEPER-1489. Data loss after truncate on transaction log (phunt)
Added:
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java
Modified:
zookeeper/branches/branch-3.3/CHANGES.txt
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java
Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Tue Jul 17 21:28:57 2012
@@ -28,6 +28,8 @@ BUGFIXES:
ZOOKEEPER-1163. Memory leak in zk_hashtable.c:do_insert_watcher_object()
(Anupam Chanda via michim)
+ ZOOKEEPER-1489. Data loss after truncate on transaction log (phunt)
+
IMPROVEMENTS:
ZOOKEEPER-1454. Document how to run autoreconf if cppunit is
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Tue Jul 17 21:28:57 2012
@@ -425,16 +425,23 @@ public class ZKDatabase {
}
/**
- * truncate the zkdatabase to this zxid
+ * Truncate the ZKDatabase to the specified zxid
* @param zxid the zxid to truncate zk database to
- * @return true if the truncate is succesful and false if not
+ * @return true if the truncate is successful and false if not
* @throws IOException
*/
public boolean truncateLog(long zxid) throws IOException {
clear();
- boolean truncated = this.snapLog.truncateLog(zxid);
+
+ // truncate the log
+ boolean truncated = snapLog.truncateLog(zxid);
+
+ if (!truncated) {
+ return false;
+ }
+
loadDataBase();
- return truncated;
+ return true;
}
/**
@@ -491,4 +498,4 @@ public class ZKDatabase {
this.snapLog.close();
}
-}
\ No newline at end of file
+}
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Tue Jul 17 21:28:57 2012
@@ -48,12 +48,12 @@ import org.apache.zookeeper.KeeperExcept
public class FileTxnSnapLog {
//the direcotry containing the
//the transaction logs
- File dataDir;
- //the directory containing the
+ private final File dataDir;
+ //the directory containing the
//the snapshot directory
- File snapDir;
- TxnLog txnLog;
- SnapShot snapLog;
+ private final File snapDir;
+ private TxnLog txnLog;
+ private SnapShot snapLog;
public final static int VERSION = 2;
public final static String version = "version-";
@@ -77,6 +77,8 @@ public class FileTxnSnapLog {
* @param snapDir the snapshot directory
*/
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
+ LOG.debug("Opening datadir:" + dataDir + " snapDir:{}" + snapDir);
+
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
if (!this.dataDir.exists()) {
@@ -266,8 +268,22 @@ public class FileTxnSnapLog {
* @throws IOException
*/
public boolean truncateLog(long zxid) throws IOException {
- FileTxnLog txnLog = new FileTxnLog(dataDir);
- return txnLog.truncate(zxid);
+ // close the existing txnLog and snapLog
+ close();
+
+ // truncate it
+ FileTxnLog truncLog = new FileTxnLog(dataDir);
+ boolean truncated = truncLog.truncate(zxid);
+ truncLog.close();
+
+ // re-open the txnLog and snapLog
+ // I'd rather just close/reopen this object itself, however that
+ // would have a big impact outside ZKDatabase as there are other
+ // objects holding a reference to this object.
+ txnLog = new FileTxnLog(dataDir);
+ snapLog = new FileSnap(snapDir);
+
+ return truncated;
}
/**
Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java?rev=1362664&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java (added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java Tue Jul 17 21:28:57 2012
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.client.FourLetterWordMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.server.util.PortForwarder;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify ZOOKEEPER-1489 - cause truncation followed by continued append to the
+ * snaplog, verify that the newly appended information (after the truncation) is
+ * readable.
+ */
+public class TruncateCorruptionTest extends TestCase {
+
+ private static final Logger LOG =
+ Logger.getLogger(TruncateCorruptionTest.class);
+
+ public interface Check {
+ boolean doCheck();
+ }
+
+ public static boolean await(Check check, long timeoutMillis)
+ throws InterruptedException {
+ long end = System.currentTimeMillis() + timeoutMillis;
+ while (end > System.currentTimeMillis()) {
+ if (check.doCheck()) {
+ LOG.debug("await succeeded after "
+ + (System.currentTimeMillis() - end + timeoutMillis));
+ return true;
+ }
+ Thread.sleep(50);
+ }
+ LOG.debug("await failed in " + timeoutMillis);
+ return false;
+ }
+
+ @Test
+ public void testTransactionLogCorruption() throws Exception {
+ // configure the ports for that test in a way so that we can disrupt the
+ // connection for wrapper1
+ ZookeeperServerWrapper wrapper1 = new ZookeeperServerWrapper(1, 7000);
+ ZookeeperServerWrapper wrapper2 = new ZookeeperServerWrapper(2, 8000);
+ ZookeeperServerWrapper wrapper3 = new ZookeeperServerWrapper(3, 8000);
+
+ wrapper2.start();
+ wrapper3.start();
+
+ wrapper2.await(ClientBase.CONNECTION_TIMEOUT);
+ wrapper3.await(ClientBase.CONNECTION_TIMEOUT);
+ List<PortForwarder> pfs = startForwarding();
+ Thread.sleep(1000);
+ wrapper1.start();
+ wrapper1.await(ClientBase.CONNECTION_TIMEOUT);
+
+ final ZooKeeper zk1 = new ZooKeeper("localhost:8201",
+ ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk1"));
+ waitForConnection(zk1);
+ zk1.create("/test", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ // wait a little until stuff is synced in between servers
+ Thread.sleep(1000);
+ wrapper2.stop();
+ // wait for reconnect
+ waitForConnection(zk1);
+ zk1.create("/test2", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ // now we stop them to force a situation where a TRUNC event is sent to
+ // the followers
+ wrapper3.stop();
+ // simulate a short interruption in network in between
+
+ stopForwarding(pfs);
+ LOG.info("interrupted network connection ... waiting for zk1 and zk2 to realize");
+
+ Assert.assertTrue(await(new Check() {
+
+ public boolean doCheck() {
+ if (zk1.getState() == States.CONNECTING) {
+ List<String> children;
+ try {
+ children = zk1.getChildren("/", false);
+
+ return children.size() != 0;
+ } catch (KeeperException.ConnectionLossException e) {
+ // just to be sure
+ return true;
+ } catch (Exception e) {
+ // silently fail
+ }
+ }
+ return false;
+ }
+ }, TimeUnit.MINUTES.toMillis(2)));
+
+ // let's clean the data dir of zk3 so that an ensemble of 2 and 3 is
+ // less advanced than 1 (just to force an event where we get a TRUNCATE
+ // message)
+ wrapper3.clean();
+ wrapper2.start();
+ wrapper3.start();
+ LOG.info("Waiting for zk2 and zk3 to form a quorum");
+
+ wrapper2.await(ClientBase.CONNECTION_TIMEOUT);
+ wrapper3.await(ClientBase.CONNECTION_TIMEOUT);
+ ZooKeeper zk2 = new ZooKeeper("localhost:8202",
+ ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk2"));
+ waitForConnection(zk2);
+
+ LOG.info("re-establishing network connection and waiting for zk1 to reconnect");
+ pfs = startForwarding();
+ waitForConnection(zk1);
+
+ // create more data ...
+ LOG.info("Creating node test3");
+ zk1.create("/test3", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ Thread.sleep(250);
+ LOG.info("List of children at zk2 before zk1 became master");
+ List<String> children2 = zk2.getChildren("/", false);
+ LOG.info(children2.toString());
+
+ LOG.info("List of children at zk1 before zk1 became master");
+ List<String> children1 = zk1.getChildren("/", false);
+ LOG.info(children1.toString());
+
+ // now cause zk1 to become master and test3 will be lost
+ LOG.info("restarting zk2 and zk3 while cleaning zk3 to enforce zk1 to become master");
+ wrapper2.stop();
+ wrapper3.stop();
+ wrapper3.clean();
+ wrapper3.start();
+ wrapper3.await(TimeUnit.MINUTES.toMillis(2));
+ ZooKeeper zk3 = new ZooKeeper("localhost:8203",
+ ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk3"));
+ waitForConnection(zk3);
+ LOG.info("Zk1 and zk3 have a quorum, now starting zk2");
+ wrapper2.start();
+ waitForConnection(zk2);
+ LOG.info("List of children at zk2");
+ children2 = zk2.getChildren("/", false);
+ LOG.info(children2.toString());
+
+ waitForConnection(zk1);
+ LOG.info("List of children at zk1");
+ children1 = zk1.getChildren("/", false);
+ Assert.assertTrue("test3 node is missing on zk1",
+ children1.contains("test3"));
+ Assert.assertTrue("test3 node is missing on zk2",
+ children2.contains("test3"));
+ Assert.assertEquals(children1, children2);
+ stopForwarding(pfs);
+ }
+
+ /**
+ * @param pfs
+ * @throws Exception
+ */
+ private void stopForwarding(List<PortForwarder> pfs) throws Exception {
+ for (PortForwarder pf : pfs) {
+ pf.shutdown();
+ }
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ */
+ private List<PortForwarder> startForwarding() throws IOException {
+ List<PortForwarder> res = new ArrayList<PortForwarder>();
+ res.add(new PortForwarder(8301, 7301));
+ res.add(new PortForwarder(8401, 7401));
+ res.add(new PortForwarder(7302, 8302));
+ res.add(new PortForwarder(7402, 8402));
+ res.add(new PortForwarder(7303, 8303));
+ res.add(new PortForwarder(7403, 8403));
+ return res;
+ }
+
+ /**
+ * @param zk
+ * @throws InterruptedException
+ */
+ private void waitForConnection(final ZooKeeper zk)
+ throws InterruptedException {
+ Assert.assertTrue(await(new Check() {
+
+ public boolean doCheck() {
+ if (zk.getState() == States.CONNECTED) {
+ List<String> children;
+ try {
+ children = zk.getChildren("/", false);
+
+ return children.size() != 0;
+ } catch (Exception e) {
+ // silently fail
+ }
+ }
+ return false;
+ }
+ }, TimeUnit.MINUTES.toMillis(2)));
+ }
+
+ static class ZkWatcher implements Watcher {
+
+ private final String clientId;
+
+ ZkWatcher(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public void process(WatchedEvent event) {
+ LOG.info("<<<EVENT>>> " + clientId + " - WatchedEvent: "
+ + event);
+ }
+ }
+
+ public static class ZookeeperServerWrapper {
+
+ private static final Logger LOG =
+ Logger.getLogger(ZookeeperServerWrapper.class);
+
+ private final MainThread server;
+ private final int clientPort;
+
+ public ZookeeperServerWrapper(int serverId, int portBase)
+ throws IOException {
+ clientPort = 8200 + serverId;
+
+ // start client port on 8200 + serverId
+ // start servers on portbase + 300 or + 400 (+serverId)
+ String quorumCfgSection = "server.1=127.0.0.1:" + (portBase + 301)
+ + ":" + (portBase + 401)
+ + "\nserver.2=127.0.0.1:" + (portBase + 302) + ":"
+ + (portBase + 402)
+ + "\nserver.3=127.0.0.1:" + (portBase + 303) + ":"
+ + (portBase + 403);
+
+ server = new MainThread(serverId, clientPort, quorumCfgSection);
+ }
+
+ public void start() throws Exception {
+ server.start();
+ }
+
+ public void await(long timeout) throws Exception {
+ long deadline = System.currentTimeMillis() + timeout;
+ String result = "?";
+ while (deadline > System.currentTimeMillis()) {
+ try {
+ result = FourLetterWordMain.send4LetterWord("127.0.0.1",
+ clientPort, "stat");
+ if (result.startsWith("Zookeeper version:")) {
+ LOG.info("Started zookeeper server on port "
+ + clientPort);
+ return;
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ LOG.info(result);
+ throw new Exception("Failed to connect to zookeeper server");
+ }
+
+ public void stop() {
+ try {
+ server.shutdown();
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while shutting down");
+ }
+ }
+
+ public void clean() throws IOException {
+ server.clean();
+ }
+ }
+}
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Tue Jul 17 21:28:57 2012
@@ -34,8 +34,8 @@ import org.apache.zookeeper.test.ClientB
import org.apache.zookeeper.test.QuorumBase;
/**
- * Has some common functionality for tests that work with QuorumPeers.
- * Override process(WatchedEvent) to implement the Watcher interface
+ * Has some common functionality for tests that work with QuorumPeers. Override
+ * process(WatchedEvent) to implement the Watcher interface
*/
public class QuorumPeerTestBase extends TestCase implements Watcher {
protected static final Logger LOG =
@@ -45,7 +45,7 @@ public class QuorumPeerTestBase extends
// ignore for this test
}
- public static class TestQPMain extends QuorumPeerMain {
+ public static class TestQPMain extends QuorumPeerMain {
public void shutdown() {
// ensure it closes - in particular wait for thread to exit
if (quorumPeer != null) {
@@ -54,16 +54,16 @@ public class QuorumPeerTestBase extends
}
}
- public static class MainThread extends Thread {
+ public static class MainThread implements Runnable {
final File confFile;
volatile TestQPMain main;
public MainThread(int myid, int clientPort, String quorumCfgSection)
throws IOException
{
- super("QuorumPeer with myid:" + myid
- + " and clientPort:" + clientPort);
File tmpDir = ClientBase.createTmpDir();
+ LOG.info("id = " + myid + " tmpDir = " + tmpDir + " clientPort = "
+ + clientPort);
confFile = new File(tmpDir, "zoo.cfg");
FileWriter fwriter = new FileWriter(confFile);
@@ -99,11 +99,13 @@ public class QuorumPeerTestBase extends
}
Thread currentThread;
+
synchronized public void start() {
main = new TestQPMain();
currentThread = new Thread(this);
currentThread.start();
}
+
public void run() {
String args[] = new String[1];
args[0] = confFile.toString();
@@ -125,5 +127,21 @@ public class QuorumPeerTestBase extends
}
}
+ public void join(long timeout) throws InterruptedException {
+ Thread t = currentThread;
+ if (t != null) {
+ t.join(timeout);
+ }
+ }
+
+ public boolean isAlive() {
+ Thread t = currentThread;
+ return t != null && t.isAlive();
+ }
+
+ public void clean() {
+ ClientBase.recursiveDelete(main.quorumPeer.getTxnFactory()
+ .getDataDir());
+ }
}
}
Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java?rev=1362664&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java (added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java Tue Jul 17 21:28:57 2012
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.zookeeper.server.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+public class PortForwarder extends Thread {
+ private static final Logger LOG = Logger.getLogger(PortForwarder.class);
+
+ private static class PortForwardWorker implements Runnable {
+
+ private final InputStream in;
+ private final OutputStream out;
+ private final Socket toClose;
+ private final Socket toClose2;
+
+ PortForwardWorker(Socket toClose, Socket toClose2, InputStream in,
+ OutputStream out) throws IOException {
+ this.toClose = toClose;
+ this.toClose2 = toClose2;
+ this.in = in;
+ this.out = out;
+ // LOG.info("starting forward for "+toClose);
+ }
+
+ public void run() {
+ Thread.currentThread().setName(toClose.toString() + "-->"
+ + toClose2.toString());
+ byte[] buf = new byte[1024];
+ try {
+ while (true) {
+ try {
+ int read = this.in.read(buf);
+ if (read > 0) {
+ try {
+ this.out.write(buf, 0, read);
+ } catch (IOException e) {
+ LOG.warn("exception during write", e);
+ try {
+ toClose.close();
+ } catch (IOException ex) {
+ // ignore
+ }
+ try {
+ toClose2.close();
+ } catch (IOException ex) {
+ // ignore
+ }
+ break;
+ }
+ }
+ } catch (SocketTimeoutException e) {
+ LOG.error("socket timeout", e);
+ }
+ Thread.sleep(1);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
+ try {
+ toClose.close();
+ } catch (IOException ex) {
+ // ignore
+ }
+ try {
+ toClose2.close();
+ } catch (IOException ex) {
+ // ignore silently
+ }
+ } catch (SocketException e) {
+ if (!"Socket closed".equals(e.getMessage())) {
+ LOG.error("Unexpected exception", e);
+ }
+ } catch (IOException e) {
+ LOG.error("Unexpected exception", e);
+ }
+ LOG.info("Shutting down forward for " + toClose);
+ }
+
+ }
+
+ private volatile boolean stopped = false;
+ private ExecutorService workers = Executors.newCachedThreadPool();
+ private ServerSocket serverSocket;
+ private final int to;
+
+ public PortForwarder(int from, int to) throws IOException {
+ this.to = to;
+ serverSocket = new ServerSocket(from);
+ serverSocket.setSoTimeout(30000);
+ this.start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!stopped) {
+ Socket sock = null;
+ try {
+ LOG.info("accepting socket local:"
+ + serverSocket.getLocalPort() + " to:" + to);
+ sock = serverSocket.accept();
+ LOG.info("accepted: local:" + sock.getLocalPort()
+ + " from:" + sock.getPort()
+ + " to:" + to);
+ Socket target = null;
+ int retry = 10;
+ while(sock.isConnected()) {
+ try {
+ target = new Socket("localhost", to);
+ break;
+ } catch (IOException e) {
+ if (retry == 0) {
+ throw e;
+ }
+ LOG.warn("connection failed, retrying(" + retry
+ + "): local:" + sock.getLocalPort()
+ + " from:" + sock.getPort()
+ + " to:" + to, e);
+ }
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+ retry--;
+ }
+ LOG.info("connected: local:" + sock.getLocalPort()
+ + " from:" + sock.getPort()
+ + " to:" + to);
+ sock.setSoTimeout(30000);
+ target.setSoTimeout(30000);
+ this.workers.execute(new PortForwardWorker(sock, target,
+ sock.getInputStream(), target.getOutputStream()));
+ this.workers.execute(new PortForwardWorker(target, sock,
+ target.getInputStream(), sock.getOutputStream()));
+ } catch (SocketTimeoutException e) {
+ LOG.warn("socket timed out local:" + sock.getLocalPort()
+ + " from:" + sock.getPort()
+ + " to:" + to, e);
+ } catch (ConnectException e) {
+ LOG.warn("connection exception local:" + sock.getLocalPort()
+ + " from:" + sock.getPort()
+ + " to:" + to, e);
+ sock.close();
+ } catch (IOException e) {
+ if (!"Socket closed".equals(e.getMessage())) {
+ LOG.warn("unexpected exception local:" + sock.getLocalPort()
+ + " from:" + sock.getPort()
+ + " to:" + to, e);
+ throw e;
+ }
+ }
+
+ }
+ } catch (IOException e) {
+ LOG.error("Unexpected exception to:" + to, e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted to:" + to, e);
+ }
+ }
+
+ public void shutdown() throws Exception {
+ this.stopped = true;
+ this.serverSocket.close();
+ this.workers.shutdownNow();
+ try {
+ if (!this.workers.awaitTermination(5, TimeUnit.SECONDS)) {
+ throw new Exception(
+ "Failed to stop forwarding within 5 seconds");
+ }
+ } catch (InterruptedException e) {
+ throw new Exception("Failed to stop forwarding");
+ }
+ this.join();
+ }
+}
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java?rev=1362664&r1=1362663&r2=1362664&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/TruncateTest.java Tue Jul 17 21:28:57 2012
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
+import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -34,9 +36,15 @@ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -67,12 +75,67 @@ public class TruncateTest extends TestCa
connected = event.getState() == Watcher.Event.KeeperState.SyncConnected;
}
};
-
+
+ @Test
+ public void testTruncationStreamReset() throws Exception {
+ File tmpdir = ClientBase.createTmpDir();
+ FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
+ ZKDatabase zkdb = new ZKDatabase(snaplog);
+
+ for (int i = 1; i <= 100; i++) {
+ append(zkdb, i);
+ }
+
+ zkdb.truncateLog(1);
+
+ append(zkdb, 200);
+
+ zkdb.close();
+
+ // verify that the truncation and subsequent append were processed
+ // correctly
+ FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
+ TxnIterator iter = txnlog.read(1);
+
+ TxnHeader hdr = iter.getHeader();
+ Record txn = iter.getTxn();
+ Assert.assertEquals(1, hdr.getZxid());
+ Assert.assertTrue(txn instanceof SetDataTxn);
+
+ iter.next();
+
+ hdr = iter.getHeader();
+ txn = iter.getTxn();
+ Assert.assertEquals(200, hdr.getZxid());
+ Assert.assertTrue(txn instanceof SetDataTxn);
+ }
+
+ private void append(ZKDatabase zkdb, int i) throws IOException {
+ TxnHeader hdr = new TxnHeader(1, 1, i, 1, ZooDefs.OpCode.setData);
+ Record txn = new SetDataTxn("/foo" + i, new byte[0], 1);
+ Request req = new Request(null, 0, 0, 0, null, null);
+ req.hdr = hdr;
+ req.txn = txn;
+
+ zkdb.append(req);
+ zkdb.commit();
+ }
+
@Test
public void testTruncate() throws IOException, InterruptedException, KeeperException {
// Prime the server that is going to come in late with 50 txns
- NIOServerCnxn.Factory factory = ClientBase.createNewServerInstance(dataDir1, null, "127.0.0.1:" + baseHostPort, 100);
- ZooKeeper zk = new ZooKeeper("127.0.0.1:" + baseHostPort, 15000, nullWatcher);
+ String hostPort = "127.0.0.1:" + baseHostPort;
+ NIOServerCnxn.Factory factory = ClientBase.createNewServerInstance(dataDir1, null, hostPort, 100);
+ ClientBase.shutdownServerInstance(factory, hostPort);
+
+ // standalone starts with 0 epoch while quorum starts with 1
+ File origfile = new File(new File(dataDir1, "version-2"), "snapshot.0");
+ File newfile = new File(new File(dataDir1, "version-2"), "snapshot.100000000");
+ origfile.renameTo(newfile);
+
+ factory = ClientBase.createNewServerInstance(dataDir1, null, hostPort, 100);
+
+ ZooKeeper zk = new ZooKeeper(hostPort, 15000, nullWatcher);
for(int i = 0; i < 50; i++) {
zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@@ -129,12 +192,14 @@ public class TruncateTest extends TestCa
}
zk1.getData("/9", false, new Stat());
try {
- // 10 wont work because the session expiration
- // will match the zxid for 10 and so we wont
- // actually truncate the zxid for 10 creation
- // but for 11 we will for sure
- zk1.getData("/11", false, new Stat());
- fail("Should have gotten an error");
+ // /10 wont work because the session expiration
+ // will match the zxid for /10 and so we wont
+ // actually truncate the zxid for /10 creation
+ // due to an artifact of switching the xid of the standalone
+ // /11 is the last entry in the log for the xid
+ // as a result /12 is the first of the truncated znodes to check for
+ zk1.getData("/12", false, new Stat());
+ Assert.fail("Should have gotten an error");
} catch(KeeperException.NoNodeException e) {
// this is what we want
}