You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/03/15 18:02:17 UTC
svn commit: r1301091 - in /zookeeper/branches/branch-3.4: ./
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/server/
src/java/test/org/apache/zookeeper/ src/java/test/org/apache/zookeeper/test/
Author: phunt
Date: Thu Mar 15 17:02:17 2012
New Revision: 1301091
URL: http://svn.apache.org/viewvc?rev=1301091&view=rev
Log:
ZOOKEEPER-1412. java client watches inconsistently triggered on reconnect (phunt)
Modified:
zookeeper/branches/branch-3.4/CHANGES.txt
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1301091&r1=1301090&r2=1301091&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Thu Mar 15 17:02:17 2012
@@ -14,6 +14,9 @@ BUGFIXES:
ZOOKEEPER-1277. servers stop serving when lower 32bits of zxid roll
over (phunt)
+ ZOOKEEPER-1412. java client watches inconsistently triggered on
+ reconnect (phunt)
+
IMPROVEMENTS:
ZOOKEEPER-1389. it would be nice if start-foreground used exec $JAVA
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1301091&r1=1301090&r2=1301091&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java Thu Mar 15 17:02:17 2012
@@ -660,6 +660,10 @@ public class ClientCnxn {
private volatile long lastZxid;
+ public long getLastZxid() {
+ return lastZxid;
+ }
+
static class EndOfStreamException extends IOException {
private static final long serialVersionUID = -5438877188796231422L;
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1301091&r1=1301090&r2=1301091&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Mar 15 17:02:17 2012
@@ -392,11 +392,12 @@ public class FinalRequestProcessor imple
err = Code.MARSHALLINGERROR;
}
+ long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
ReplyHeader hdr =
- new ReplyHeader(request.cxid, request.zxid, err.intValue());
+ new ReplyHeader(request.cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
- cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
+ cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, System.currentTimeMillis());
try {
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=1301091&r1=1301090&r2=1301091&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/TestableZooKeeper.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Thu Mar 15 17:02:17 2012
@@ -20,7 +20,6 @@ package org.apache.zookeeper;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
import java.util.List;
public class TestableZooKeeper extends ZooKeeper {
@@ -47,6 +46,15 @@ public class TestableZooKeeper extends Z
return super.getExistWatches();
}
+ /**
+ * Cause this ZooKeeper object to disconnect from the server. It will then
+ * later attempt to reconnect.
+ */
+ public void testableConnloss() throws IOException {
+ synchronized(cnxn) {
+ cnxn.sendThread.testableCloseSocket();
+ }
+ }
/**
* Cause this ZooKeeper object to stop receiving from the ZooKeeperServer
@@ -84,4 +92,11 @@ public class TestableZooKeeper extends Z
public SocketAddress testableRemoteSocketAddress() {
return super.testableRemoteSocketAddress();
}
+
+ /**
+ * @return the last zxid as seen by the client session
+ */
+ public long testableLastZxid() {
+ return cnxn.getLastZxid();
+ }
}
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1301091&r1=1301090&r2=1301091&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Thu Mar 15 17:02:17 2012
@@ -18,13 +18,17 @@
package org.apache.zookeeper.test;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -34,8 +38,11 @@ import org.apache.log4j.Logger;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.quorum.Leader;
@@ -462,6 +469,24 @@ public class FollowerResyncConcurrencyTe
return zk;
}
+ private static TestableZooKeeper createTestableClient(String hp)
+ throws IOException, TimeoutException, InterruptedException
+ {
+ CountdownWatcher watcher = new CountdownWatcher();
+ return createTestableClient(watcher, hp);
+ }
+
+ private static TestableZooKeeper createTestableClient(
+ CountdownWatcher watcher, String hp)
+ throws IOException, TimeoutException, InterruptedException
+ {
+ TestableZooKeeper zk = new TestableZooKeeper(
+ hp, ClientBase.CONNECTION_TIMEOUT, watcher);
+
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ return zk;
+ }
+
private void verifyState(QuorumUtil qu, int index, Leader leader) {
assertTrue("Not following", qu.getPeer(index).peer.follower != null);
long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
@@ -498,4 +523,85 @@ public class FollowerResyncConcurrencyTe
assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
}
}
+
+ /**
+ * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412.
+ */
+ @Test
+ public void testFollowerSendsLastZxid() throws Exception {
+ QuorumUtil qu = new QuorumUtil(1);
+ qu.startAll();
+
+ int index = 1;
+ while(qu.getPeer(index).peer.follower == null) {
+ index++;
+ }
+ LOG.info("Connecting to follower:" + index);
+
+ TestableZooKeeper zk =
+ createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
+
+ assertEquals(0L, zk.testableLastZxid());
+ zk.exists("/", false);
+ long lzxid = zk.testableLastZxid();
+ assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0);
+ zk.close();
+ }
+
+ private class MyWatcher extends CountdownWatcher {
+ LinkedBlockingQueue<WatchedEvent> events =
+ new LinkedBlockingQueue<WatchedEvent>();
+
+ public void process(WatchedEvent event) {
+ super.process(event);
+ if (event.getType() != Event.EventType.None) {
+ try {
+ events.put(event);
+ } catch (InterruptedException e) {
+ LOG.warn("ignoring interrupt during event.put");
+ }
+ }
+ }
+ }
+
+ /**
+ * Verify that the server is sending the proper zxid, and as a result
+ * the watch doesn't fire. See ZOOKEEPER-1412.
+ */
+ @Test
+ public void testFollowerWatcherResync() throws Exception {
+ QuorumUtil qu = new QuorumUtil(1);
+ qu.startAll();
+
+ int index = 1;
+ while(qu.getPeer(index).peer.follower == null) {
+ index++;
+ }
+ LOG.info("Connecting to follower:" + index);
+
+ TestableZooKeeper zk1 = createTestableClient(
+ "localhost:" + qu.getPeer(index).peer.getClientPort());
+ zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ MyWatcher watcher = new MyWatcher();
+ TestableZooKeeper zk2 = createTestableClient(watcher,
+ "localhost:" + qu.getPeer(index).peer.getClientPort());
+
+ zk2.exists("/foo", true);
+
+ watcher.reset();
+ zk2.testableConnloss();
+ if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
+ {
+ fail("Unable to connect to server");
+ }
+ assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null));
+
+ assertNull(watcher.events.poll(5, TimeUnit.SECONDS));
+
+ zk1.close();
+ zk2.close();
+ }
+
}