You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/03/15 18:01:10 UTC

svn commit: r1301089 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/ src/java/test/org/apache/zookeeper/test/

Author: phunt
Date: Thu Mar 15 17:01:09 2012
New Revision: 1301089

URL: http://svn.apache.org/viewvc?rev=1301089&view=rev
Log:
ZOOKEEPER-1412. java client watches inconsistently triggered on reconnect (phunt)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1301089&r1=1301088&r2=1301089&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Mar 15 17:01:09 2012
@@ -142,6 +142,9 @@ BUGFIXES:
 
   ZOOKEEPER-1277. servers stop serving when lower 32bits of zxid roll
   over (phunt)
+
+  ZOOKEEPER-1412. java client watches inconsistently triggered on
+  reconnect (phunt)
   
 IMPROVEMENTS:
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1301089&r1=1301088&r2=1301089&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Thu Mar 15 17:01:09 2012
@@ -660,6 +660,10 @@ public class ClientCnxn {
 
     private volatile long lastZxid;
 
+    public long getLastZxid() {
+        return lastZxid;
+    }
+
     static class EndOfStreamException extends IOException {
         private static final long serialVersionUID = -5438877188796231422L;
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1301089&r1=1301088&r2=1301089&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Mar 15 17:01:09 2012
@@ -388,11 +388,12 @@ public class FinalRequestProcessor imple
             err = Code.MARSHALLINGERROR;
         }
 
+        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
         ReplyHeader hdr =
-            new ReplyHeader(request.cxid, request.zxid, err.intValue());
+            new ReplyHeader(request.cxid, lastZxid, err.intValue());
 
         zks.serverStats().updateLatency(request.createTime);
-        cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
+        cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
                     request.createTime, System.currentTimeMillis());
 
         try {

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=1301089&r1=1301088&r2=1301089&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Thu Mar 15 17:01:09 2012
@@ -46,6 +46,15 @@ public class TestableZooKeeper extends Z
         return super.getExistWatches();
     }
 
+    /**
+     * Cause this ZooKeeper object to disconnect from the server. It will then
+     * later attempt to reconnect.
+     */
+    public void testableConnloss() throws IOException {
+        synchronized(cnxn) {
+            cnxn.sendThread.testableCloseSocket();
+        }
+    }
 
     /**
      * Cause this ZooKeeper object to stop receiving from the ZooKeeperServer
@@ -83,4 +92,11 @@ public class TestableZooKeeper extends Z
     public SocketAddress testableRemoteSocketAddress() {
         return super.testableRemoteSocketAddress();
     }
+
+    /**
+     * @return the last zxid as seen by the client session
+     */
+    public long testableLastZxid() {
+        return cnxn.getLastZxid();
+    }
 }

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1301089&r1=1301088&r2=1301089&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Thu Mar 15 17:01:09 2012
@@ -18,13 +18,17 @@
 
 package org.apache.zookeeper.test;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -34,8 +38,11 @@ import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.quorum.Leader;
@@ -444,6 +451,24 @@ public class FollowerResyncConcurrencyTe
         return zk;
     }
 
+    private static TestableZooKeeper createTestableClient(String hp)
+        throws IOException, TimeoutException, InterruptedException
+    {
+        CountdownWatcher watcher = new CountdownWatcher();
+        return createTestableClient(watcher, hp);
+    }
+
+    private static TestableZooKeeper createTestableClient(
+            CountdownWatcher watcher, String hp)
+            throws IOException, TimeoutException, InterruptedException
+        {
+            TestableZooKeeper zk = new TestableZooKeeper(
+                    hp, ClientBase.CONNECTION_TIMEOUT, watcher);
+
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            return zk;
+        }
+
     private void verifyState(QuorumUtil qu, int index, Leader leader) {
         assertTrue("Not following", qu.getPeer(index).peer.follower != null);
         long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
@@ -479,5 +504,86 @@ public class FollowerResyncConcurrencyTe
             assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
             assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
         }
+    }      
+
+    /**
+     * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412.
+     */
+    @Test
+    public void testFollowerSendsLastZxid() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.follower == null) {
+            index++;
+        }
+        LOG.info("Connecting to follower:" + index);
+
+        TestableZooKeeper zk =
+                createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
+
+        assertEquals(0L, zk.testableLastZxid());
+        zk.exists("/", false);
+        long lzxid = zk.testableLastZxid();
+        assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0);
+        zk.close();
     }
+
+    private class MyWatcher extends CountdownWatcher {
+        LinkedBlockingQueue<WatchedEvent> events =
+            new LinkedBlockingQueue<WatchedEvent>();
+
+        public void process(WatchedEvent event) {
+            super.process(event);
+            if (event.getType() != Event.EventType.None) {
+                try {
+                    events.put(event);
+                } catch (InterruptedException e) {
+                    LOG.warn("ignoring interrupt during event.put");
+                }
+            }
+        }
+    }
+
+    /**
+     * Verify that the server is sending the proper zxid, and as a result
+     * the watch doesn't fire. See ZOOKEEPER-1412.
+     */
+    @Test
+    public void testFollowerWatcherResync() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.follower == null) {
+            index++;
+        }
+        LOG.info("Connecting to follower:" + index);
+
+        TestableZooKeeper zk1 = createTestableClient(
+                "localhost:" + qu.getPeer(index).peer.getClientPort());
+        zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        TestableZooKeeper zk2 = createTestableClient(watcher,
+                "localhost:" + qu.getPeer(index).peer.getClientPort());
+
+        zk2.exists("/foo", true);
+
+        watcher.reset();
+        zk2.testableConnloss();
+        if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
+        {
+            fail("Unable to connect to server");
+        }
+        assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null));
+
+        assertNull(watcher.events.poll(5, TimeUnit.SECONDS));
+
+        zk1.close();
+        zk2.close();
+    }
+
 }