You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/03/15 18:03:12 UTC
svn commit: r1301092 - in /zookeeper/branches/branch-3.3: ./
src/java/main/org/apache/zookeeper/server/
src/java/test/org/apache/zookeeper/ src/java/test/org/apache/zookeeper/test/
Author: phunt
Date: Thu Mar 15 17:03:11 2012
New Revision: 1301092
URL: http://svn.apache.org/viewvc?rev=1301092&view=rev
Log:
ZOOKEEPER-1412. java client watches inconsistently triggered on reconnect (phunt)
Modified:
zookeeper/branches/branch-3.3/CHANGES.txt
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1301092&r1=1301091&r2=1301092&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Thu Mar 15 17:03:11 2012
@@ -29,6 +29,9 @@ BUGFIXES:
ZOOKEEPER-1277. servers stop serving when lower 32bits of zxid roll
over (phunt)
+
+ ZOOKEEPER-1412. java client watches inconsistently triggered on
+ reconnect (phunt)
IMPROVEMENTS:
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1301092&r1=1301091&r2=1301092&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Mar 15 17:03:11 2012
@@ -346,12 +346,13 @@ public class FinalRequestProcessor imple
err = Code.MARSHALLINGERROR;
}
+ long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
ReplyHeader hdr =
- new ReplyHeader(request.cxid, request.zxid, err.intValue());
+ new ReplyHeader(request.cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
((CnxnStats)cnxn.getStats())
- .updateForResponse(request.cxid, request.zxid, lastOp,
+ .updateForResponse(request.cxid, lastZxid, lastOp,
request.createTime, System.currentTimeMillis());
try {
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=1301092&r1=1301091&r2=1301092&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/TestableZooKeeper.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Thu Mar 15 17:03:11 2012
@@ -47,6 +47,15 @@ public class TestableZooKeeper extends Z
return super.getExistWatches();
}
+ /**
+ * Cause this ZooKeeper object to disconnect from the server. It will then
+ * later attempt to reconnect.
+ */
+ public void testableConnloss() throws IOException {
+ synchronized(cnxn) {
+ ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+ }
+ }
/**
* Cause this ZooKeeper object to stop receiving from the ZooKeeperServer
@@ -84,4 +93,11 @@ public class TestableZooKeeper extends Z
public SocketAddress testableRemoteSocketAddress() {
return super.testableRemoteSocketAddress();
}
+
+ /**
+ * @return the last zxid as seen by the client session
+ */
+ public long testableLastZxid() {
+ return cnxn.lastZxid;
+ }
}
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1301092&r1=1301091&r2=1301092&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Thu Mar 15 17:03:11 2012
@@ -18,6 +18,7 @@
package org.apache.zookeeper.test;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -25,6 +26,7 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -35,10 +37,13 @@ import org.apache.log4j.Logger;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
@@ -422,4 +427,85 @@ public class FollowerResyncConcurrencyTe
assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
}
}
+
+ /**
+ * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412.
+ */
+ @Test
+ public void testFollowerSendsLastZxid() throws Exception {
+ QuorumUtil qu = new QuorumUtil(1);
+ qu.startAll();
+
+ int index = 1;
+ while(qu.getPeer(index).peer.follower == null) {
+ index++;
+ }
+ LOG.info("Connecting to follower:" + index);
+
+ TestableZooKeeper zk =
+ createClient("localhost:" + qu.getPeer(index).peer.getClientPort());
+
+ assertEquals(0L, zk.testableLastZxid());
+ zk.exists("/", false);
+ long lzxid = zk.testableLastZxid();
+ assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0);
+ zk.close();
+ }
+
+ private class MyWatcher extends CountdownWatcher {
+ LinkedBlockingQueue<WatchedEvent> events =
+ new LinkedBlockingQueue<WatchedEvent>();
+
+ public void process(WatchedEvent event) {
+ super.process(event);
+ if (event.getType() != Event.EventType.None) {
+ try {
+ events.put(event);
+ } catch (InterruptedException e) {
+ LOG.warn("ignoring interrupt during event.put");
+ }
+ }
+ }
+ }
+
+ /**
+ * Verify that the server is sending the proper zxid, and as a result
+ * the watch doesn't fire. See ZOOKEEPER-1412.
+ */
+ @Test
+ public void testFollowerWatcherResync() throws Exception {
+ QuorumUtil qu = new QuorumUtil(1);
+ qu.startAll();
+
+ int index = 1;
+ while(qu.getPeer(index).peer.follower == null) {
+ index++;
+ }
+ LOG.info("Connecting to follower:" + index);
+
+ TestableZooKeeper zk1 = createClient(
+ "localhost:" + qu.getPeer(index).peer.getClientPort());
+ zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ MyWatcher watcher = new MyWatcher();
+ TestableZooKeeper zk2 = createClient(watcher,
+ "localhost:" + qu.getPeer(index).peer.getClientPort());
+
+ zk2.exists("/foo", true);
+
+ watcher.reset();
+ zk2.testableConnloss();
+ if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
+ {
+ fail("Unable to connect to server");
+ }
+ assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null));
+
+ assertNull(watcher.events.poll(5, TimeUnit.SECONDS));
+
+ zk1.close();
+ zk2.close();
+ }
+
}