You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2013/10/22 12:54:44 UTC

svn commit: r1534599 - in /zookeeper/trunk: CHANGES.txt src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml src/java/main/org/apache/zookeeper/server/DataTree.java src/java/test/org/apache/zookeeper/test/WatchEventWhenAutoReset.java

Author: fpj
Date: Tue Oct 22 10:54:44 2013
New Revision: 1534599

URL: http://svn.apache.org/r1534599
Log:
ZOOKEEPER-1667. Watch event isn't handled correctly when a client reestablish to a server (jacky007, fpj via fpj)


Added:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatchEventWhenAutoReset.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1534599&r1=1534598&r2=1534599&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Oct 22 10:54:44 2013
@@ -453,6 +453,9 @@ BUGFIXES:
   ZOOKEEPER-1732. ZooKeeper server unable to join established
   ensemble (German Blanco via fpj)
 
+  ZOOKEEPER-1667. Watch event isn't handled correctly when 
+  a client reestablish to a server (jacky007, fpj via fpj)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml?rev=1534599&r1=1534598&r2=1534599&view=diff
==============================================================================
--- zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml (original)
+++ zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml Tue Oct 22 10:54:44 2013
@@ -648,6 +648,37 @@
     may be missed: a watch for the existence of a znode not yet created will
     be missed if the znode is created and deleted while disconnected.</para>
 
+	<section id="sc_WatchSemantics">
+      <title>Semantics of Watches</title>
+	  
+	  <para> We can set watches with the three calls that read the state of 
+	  ZooKeeper: exists, getData, and getChildren. The following list details
+	  the events that a watch can trigger and the calls that enable them:
+	  </para>
+	  
+	  <itemizedlist>
+        <listitem>
+          <para><emphasis role="bold">Created event:</emphasis></para>
+          <para>Enabled with a call to exists.</para>
+        </listitem>
+        
+        <listitem>
+          <para><emphasis role="bold">Deleted event:</emphasis></para>
+          <para>Enabled with a call to exists, getData, and getChildren.</para>
+        </listitem>
+        
+        <listitem>
+          <para><emphasis role="bold">Changed event:</emphasis></para>
+          <para>Enabled with a call to exists and getData.</para>
+        </listitem>
+        
+        <listitem>
+          <para><emphasis role="bold">Child event:</emphasis></para>
+          <para>Enabled with a call to getChildren.</para>
+        </listitem>
+      </itemizedlist>
+	</section>
+	
     <section id="sc_WatchGuarantees">
       <title>What ZooKeeper Guarantees about Watches</title>
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1534599&r1=1534598&r2=1534599&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Tue Oct 22 10:54:44 2013
@@ -1295,56 +1295,37 @@ public class DataTree {
             DataNode node = getNode(path);
             WatchedEvent e = null;
             if (node == null) {
-                e = new WatchedEvent(EventType.NodeDeleted,
-                        KeeperState.SyncConnected, path);
-            } else if (node.stat.getCzxid() > relativeZxid) {
-                e = new WatchedEvent(EventType.NodeCreated,
-                        KeeperState.SyncConnected, path);
+                watcher.process(new WatchedEvent(EventType.NodeDeleted, 
+                            KeeperState.SyncConnected, path));
             } else if (node.stat.getMzxid() > relativeZxid) {
-                e = new WatchedEvent(EventType.NodeDataChanged,
-                        KeeperState.SyncConnected, path);
-            }
-            if (e == null) {
-                this.dataWatches.addWatch(path, watcher);
+                watcher.process(new WatchedEvent(EventType.NodeDataChanged, 
+                            KeeperState.SyncConnected, path));
             } else {
-                watcher.process(e);
-            }
-        }
+                this.dataWatches.addWatch(path, watcher);
+            }    
+        }    
         for (String path : existWatches) {
             DataNode node = getNode(path);
-            WatchedEvent e = null;
-            if (node == null) {
-                // This is the case when the watch was registered
-            } else if (node.stat.getMzxid() > relativeZxid) {
-                e = new WatchedEvent(EventType.NodeDataChanged,
-                        KeeperState.SyncConnected, path);
+            if (node != null) {
+                watcher.process(new WatchedEvent(EventType.NodeCreated, 
+                            KeeperState.SyncConnected, path));
             } else {
-                e = new WatchedEvent(EventType.NodeCreated,
-                        KeeperState.SyncConnected, path);
-            }
-            if (e == null) {
                 this.dataWatches.addWatch(path, watcher);
-            } else {
-                watcher.process(e);
-            }
-        }
+            }    
+        }    
         for (String path : childWatches) {
             DataNode node = getNode(path);
-            WatchedEvent e = null;
             if (node == null) {
-                e = new WatchedEvent(EventType.NodeDeleted,
-                        KeeperState.SyncConnected, path);
+                watcher.process(new WatchedEvent(EventType.NodeDeleted, 
+                            KeeperState.SyncConnected, path));
             } else if (node.stat.getPzxid() > relativeZxid) {
-                e = new WatchedEvent(EventType.NodeChildrenChanged,
-                        KeeperState.SyncConnected, path);
-            }
-            if (e == null) {
-                this.childWatches.addWatch(path, watcher);
+                watcher.process(new WatchedEvent(EventType.NodeChildrenChanged, 
+                            KeeperState.SyncConnected, path));
             } else {
-                watcher.process(e);
-            }
-        }
-    }
+                this.childWatches.addWatch(path, watcher);
+            }    
+        }    
+    }    
 
      /**
       * This method sets the Cversion and Pzxid for the specified node to the

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatchEventWhenAutoReset.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatchEventWhenAutoReset.java?rev=1534599&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatchEventWhenAutoReset.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatchEventWhenAutoReset.java Tue Oct 22 10:54:44 2013
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.TestCase;
+
+public class WatchEventWhenAutoReset extends TestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(WatchEventWhenAutoReset.class);
+
+    // waiting time for expected condition
+    private static final int TIMEOUT = 30000;
+
+    static public class EventsWatcher extends CountdownWatcher {
+        private LinkedBlockingQueue<WatchedEvent> dataEvents = new LinkedBlockingQueue<WatchedEvent>();
+
+        @Override
+        public void process(WatchedEvent event) {
+            super.process(event);
+            try {
+                if (event.getType() != Event.EventType.None) {
+                    dataEvents.put(event);
+                }
+            } catch (InterruptedException e) {
+                LOG.warn("ignoring interrupt during EventsWatcher process");
+            }
+        }
+
+        public void assertEvent(long timeout, EventType eventType) {
+            try {
+                WatchedEvent event = dataEvents.poll(timeout,
+                        TimeUnit.MILLISECONDS);
+                Assert.assertNotNull("do not receive a " + eventType, event);
+                Assert.assertEquals(eventType, event.getType());
+            } catch (InterruptedException e) {
+                LOG.warn("ignoring interrupt during EventsWatcher assertEvent");
+            }
+        }
+    }
+
+    private ZooKeeper createClient(QuorumUtil qu, int id, EventsWatcher watcher)
+            throws IOException {
+        String hostPort = "127.0.0.1:" + qu.getPeer(id).clientPort;
+        ZooKeeper zk = new ZooKeeper(hostPort, TIMEOUT, watcher);
+        try {
+            watcher.waitForConnected(TIMEOUT);
+        } catch (InterruptedException e) {
+            // ignoring the interrupt
+        } catch (TimeoutException e) {
+            fail("can not connect to " + hostPort);
+        }
+        return zk;
+    }
+
+    private ZooKeeper createClient(QuorumUtil qu, int id) throws IOException {
+        return createClient(qu, id, new EventsWatcher());
+    }
+
+    @Test
+    public void testNodeDataChanged() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        EventsWatcher watcher = new EventsWatcher();
+        ZooKeeper zk1 = createClient(qu, 1, watcher);
+        ZooKeeper zk2 = createClient(qu, 2);
+
+        String path = "/test-changed";
+
+        zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.getData(path, watcher, null);
+        qu.shutdown(1);
+        zk2.delete(path, -1);
+        zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        qu.start(1);
+        watcher.waitForConnected(TIMEOUT);
+        watcher.assertEvent(TIMEOUT, EventType.NodeDataChanged);
+
+        zk1.exists(path, watcher);
+        qu.shutdown(1);
+        zk2.delete(path, -1);
+        zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        qu.start(1);
+        watcher.waitForConnected(TIMEOUT * 1000L);
+        watcher.assertEvent(TIMEOUT, EventType.NodeDataChanged);
+
+        qu.shutdownAll();
+    }
+
+    @Test
+    public void testNodeCreated() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        EventsWatcher watcher = new EventsWatcher();
+        ZooKeeper zk1 = createClient(qu, 1, watcher);
+        ZooKeeper zk2 = createClient(qu, 2);
+
+        String path = "/test1-created";
+
+        zk1.exists(path, watcher);
+        qu.shutdown(1);
+        zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        qu.start(1);
+        watcher.waitForConnected(TIMEOUT * 1000L);
+        watcher.assertEvent(TIMEOUT, EventType.NodeCreated);
+
+        qu.shutdownAll();
+    }
+
+    @Test
+    public void testNodeDeleted() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        EventsWatcher watcher = new EventsWatcher();
+        ZooKeeper zk1 = createClient(qu, 1, watcher);
+        ZooKeeper zk2 = createClient(qu, 2);
+
+        String path = "/test-deleted";
+
+        zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.getData(path, watcher, null);
+        qu.shutdown(1);
+        zk2.delete(path, -1);
+        qu.start(1);
+        watcher.waitForConnected(TIMEOUT * 1000L);
+        watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
+
+        zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.exists(path, watcher);
+        qu.shutdown(1);
+        zk2.delete(path, -1);
+        qu.start(1);
+        watcher.waitForConnected(TIMEOUT * 1000L);
+        watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
+
+        zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.getChildren(path, watcher);
+        qu.shutdown(1);
+        zk2.delete(path, -1);
+        qu.start(1);
+        watcher.waitForConnected(TIMEOUT * 1000L);
+        watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
+
+        qu.shutdownAll();
+    }
+
+    @Test
+    public void testNodeChildrenChanged() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        EventsWatcher watcher = new EventsWatcher();
+        ZooKeeper zk1 = createClient(qu, 1, watcher);
+        ZooKeeper zk2 = createClient(qu, 2);
+
+        String path = "/test-children-changed";
+
+        zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.getChildren(path, watcher);
+        qu.shutdown(1);
+        zk2.create(path + "/children-1", new byte[2],
+                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        qu.start(1);
+        watcher.waitForConnected(TIMEOUT * 1000L);
+        watcher.assertEvent(TIMEOUT, EventType.NodeChildrenChanged);
+
+        qu.shutdownAll();
+    }
+}
+