You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by rg...@apache.org on 2015/06/13 02:49:48 UTC

svn commit: r1685205 - in /zookeeper/branches/branch-3.4: CHANGES.txt src/java/main/org/apache/zookeeper/ClientCnxn.java src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java

Author: rgs
Date: Sat Jun 13 00:49:48 2015
New Revision: 1685205

URL: http://svn.apache.org/r1685205
Log:
ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail
(Chris Thunes via rgs)

Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1685205&r1=1685204&r2=1685205&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Sat Jun 13 00:49:48 2015
@@ -93,6 +93,9 @@ BUGFIXES:
   ZOOKEEPER-2213: Empty path in Set crashes server and prevents restart
   (Hongchao Deng via rgs)
 
+  ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail
+  (Chris Thunes via rgs)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1575. adding .gitattributes to prevent CRLF and LF mismatches for

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1685205&r1=1685204&r2=1685205&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/ClientCnxn.java Sat Jun 13 00:49:48 2015
@@ -28,6 +28,8 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -88,6 +90,16 @@ public class ClientCnxn {
     private static final String ZK_SASL_CLIENT_USERNAME =
         "zookeeper.sasl.client.username";
 
+    /* ZOOKEEPER-706: If a session has a large number of watches set then
+     * attempting to re-establish those watches after a connection loss may
+     * fail due to the SetWatches request exceeding the server's configured
+     * jute.maxBuffer value. To avoid this we instead split the watch
+     * re-establishement across multiple SetWatches calls. This constant
+     * controls the size of each call. It is set to 128kB to be conservative
+     * with respect to the server's 1MB default for jute.maxBuffer.
+     */
+    private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
+
     /** This controls whether automatic watch resetting is enabled.
      * Clients automatically reset watches during session reconnect, this
      * option allows the client to turn off this behavior by setting
@@ -868,15 +880,48 @@ public class ClientCnxn {
                     List<String> childWatches = zooKeeper.getChildWatches();
                     if (!dataWatches.isEmpty()
                                 || !existWatches.isEmpty() || !childWatches.isEmpty()) {
-                        SetWatches sw = new SetWatches(lastZxid,
-                                prependChroot(dataWatches),
-                                prependChroot(existWatches),
-                                prependChroot(childWatches));
-                        RequestHeader h = new RequestHeader();
-                        h.setType(ZooDefs.OpCode.setWatches);
-                        h.setXid(-8);
-                        Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
-                        outgoingQueue.addFirst(packet);
+
+                        Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
+                        Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
+                        Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
+                        long setWatchesLastZxid = lastZxid;
+
+                        while (dataWatchesIter.hasNext()
+                                       || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
+                            List<String> dataWatchesBatch = new ArrayList<String>();
+                            List<String> existWatchesBatch = new ArrayList<String>();
+                            List<String> childWatchesBatch = new ArrayList<String>();
+                            int batchLength = 0;
+
+                            // Note, we may exceed our max length by a bit when we add the last
+                            // watch in the batch. This isn't ideal, but it makes the code simpler.
+                            while (batchLength < SET_WATCHES_MAX_LENGTH) {
+                                final String watch;
+                                if (dataWatchesIter.hasNext()) {
+                                    watch = dataWatchesIter.next();
+                                    dataWatchesBatch.add(watch);
+                                } else if (existWatchesIter.hasNext()) {
+                                    watch = existWatchesIter.next();
+                                    existWatchesBatch.add(watch);
+                                } else if (childWatchesIter.hasNext()) {
+                                    watch = childWatchesIter.next();
+                                    childWatchesBatch.add(watch);
+                                } else {
+                                    break;
+                                }
+                                batchLength += watch.length();
+                            }
+
+                            SetWatches sw = new SetWatches(setWatchesLastZxid,
+                                    dataWatchesBatch,
+                                    existWatchesBatch,
+                                    childWatchesBatch);
+                            RequestHeader h = new RequestHeader();
+                            h.setType(ZooDefs.OpCode.setWatches);
+                            h.setXid(-8);
+                            Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
+                            outgoingQueue.addFirst(packet);
+                        }
                     }
                 }
 

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java?rev=1685205&r1=1685204&r2=1685205&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java Sat Jun 13 00:49:48 2015
@@ -18,6 +18,8 @@
 
 package org.apache.zookeeper.test;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -173,4 +175,83 @@ public class DisconnectedWatcherTest ext
         Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
         Assert.assertEquals("/are", e.getPath());
     }
+
+    // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of
+    // watches which require multiple SetWatches calls.
+    @Test
+    public void testManyChildWatchersAutoReset() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher);
+
+        // 110 character base path
+        String pathBase = "/long-path-000000000-111111111-222222222-333333333-444444444-"
+                          + "555555555-666666666-777777777-888888888-999999999";
+
+        zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE,
+                   CreateMode.PERSISTENT);
+
+        // Create 10,000 nodes. This should ensure the length of our
+        // watches set below exceeds 1MB.
+        List<String> paths = new ArrayList<String>();
+        for (int i = 0; i < 10000; i++) {
+            String path = zk1.create(pathBase + "/ch-", null, Ids.OPEN_ACL_UNSAFE,
+                                     CreateMode.PERSISTENT_SEQUENTIAL);
+            paths.add(path);
+        }
+
+        MyWatcher childWatcher = new MyWatcher();
+
+        // Set a combination of child/exists/data watches
+        int i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk2.getChildren(path, childWatcher);
+            } else if (i % 3 == 1) {
+                zk2.exists(path + "/foo", childWatcher);
+            } else if (i % 3 == 2) {
+                zk2.getData(path, childWatcher, null);
+            }
+
+            i++;
+        }
+
+        stopServer();
+        watcher.waitForDisconnected(30000);
+        startServer();
+        watcher.waitForConnected(30000);
+
+        // Trigger the watches and ensure they properly propagate to the client
+        i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+                Assert.assertEquals(path, e.getPath());
+            } else if (i % 3 == 1) {
+                zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeCreated, e.getType());
+                Assert.assertEquals(path + "/foo", e.getPath());
+            } else if (i % 3 == 2) {
+                zk1.setData(path, new byte[]{1, 2, 3}, -1);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeDataChanged, e.getType());
+                Assert.assertEquals(path, e.getPath());
+            }
+
+            i++;
+        }
+    }
+
 }