You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by rg...@apache.org on 2015/06/13 02:42:49 UTC
svn commit: r1685200 - in /zookeeper/trunk: CHANGES.txt
src/java/main/org/apache/zookeeper/ClientCnxn.java
src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
Author: rgs
Date: Sat Jun 13 00:42:49 2015
New Revision: 1685200
URL: http://svn.apache.org/r1685200
Log:
ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail
(Chris Thunes via rgs)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1685200&r1=1685199&r2=1685200&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sat Jun 13 00:42:49 2015
@@ -128,6 +128,9 @@ BUGFIXES:
ZOOKEEPER-2213: Empty path in Set crashes server and prevents restart
(Hongchao Deng via rgs)
+ ZOOKEEPER-706: Large numbers of watches can cause session re-establishment to fail
+ (Chris Thunes via rgs)
+
IMPROVEMENTS:
ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex)
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1685200&r1=1685199&r2=1685200&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Sat Jun 13 00:42:49 2015
@@ -28,6 +28,7 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -99,6 +100,16 @@ public class ClientCnxn {
private static final String ZK_SASL_CLIENT_USERNAME =
"zookeeper.sasl.client.username";
+ /* ZOOKEEPER-706: If a session has a large number of watches set then
+ * attempting to re-establish those watches after a connection loss may
+ * fail due to the SetWatches request exceeding the server's configured
+ * jute.maxBuffer value. To avoid this we instead split the watch
+ * re-establishement across multiple SetWatches calls. This constant
+ * controls the size of each call. It is set to 128kB to be conservative
+ * with respect to the server's 1MB default for jute.maxBuffer.
+ */
+ private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
+
/** This controls whether automatic watch resetting is enabled.
* Clients automatically reset watches during session reconnect, this
* option allows the client to turn off this behavior by setting
@@ -983,15 +994,45 @@ public class ClientCnxn {
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
- SetWatches sw = new SetWatches(lastZxid,
- prependChroot(dataWatches),
- prependChroot(existWatches),
- prependChroot(childWatches));
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.setWatches);
- h.setXid(-8);
- Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
- outgoingQueue.addFirst(packet);
+ Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
+ Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
+ Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
+ long setWatchesLastZxid = lastZxid;
+
+ while (dataWatchesIter.hasNext()
+ || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
+ List<String> dataWatchesBatch = new ArrayList<String>();
+ List<String> existWatchesBatch = new ArrayList<String>();
+ List<String> childWatchesBatch = new ArrayList<String>();
+ int batchLength = 0;
+
+ // Note, we may exceed our max length by a bit when we add the last
+ // watch in the batch. This isn't ideal, but it makes the code simpler.
+ while (batchLength < SET_WATCHES_MAX_LENGTH) {
+ final String watch;
+ if (dataWatchesIter.hasNext()) {
+ watch = dataWatchesIter.next();
+ dataWatchesBatch.add(watch);
+ } else if (existWatchesIter.hasNext()) {
+ watch = existWatchesIter.next();
+ existWatchesBatch.add(watch);
+ } else if (childWatchesIter.hasNext()) {
+ watch = childWatchesIter.next();
+ childWatchesBatch.add(watch);
+ } else {
+ break;
+ }
+ batchLength += watch.length();
+ }
+
+ SetWatches sw = new SetWatches(setWatchesLastZxid,
+ dataWatchesBatch,
+ existWatchesBatch,
+ childWatchesBatch);
+ RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
+ Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
+ outgoingQueue.addFirst(packet);
+ }
}
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java?rev=1685200&r1=1685199&r2=1685200&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectedWatcherTest.java Sat Jun 13 00:42:49 2015
@@ -18,6 +18,8 @@
package org.apache.zookeeper.test;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -173,4 +175,80 @@ public class DisconnectedWatcherTest ext
Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
Assert.assertEquals("/are", e.getPath());
}
+
+ // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of
+ // watches which require multiple SetWatches calls.
+ @Test
+ public void testManyChildWatchersAutoReset() throws Exception {
+ ZooKeeper zk1 = createClient();
+
+ MyWatcher watcher = new MyWatcher();
+ ZooKeeper zk2 = createClient(watcher);
+
+ // 110 character base path
+ String pathBase = "/long-path-000000000-111111111-222222222-333333333-444444444-"
+ + "555555555-666666666-777777777-888888888-999999999";
+
+ zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // Create 10,000 nodes. This should ensure the length of our
+ // watches set below exceeds 1MB.
+ List<String> paths = new ArrayList<String>();
+ for (int i = 0; i < 10000; i++) {
+ String path = zk1.create(pathBase + "/ch-", null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ paths.add(path);
+ }
+
+ MyWatcher childWatcher = new MyWatcher();
+
+ // Set a combination of child/exists/data watches
+ int i = 0;
+ for (String path : paths) {
+ if (i % 3 == 0) {
+ zk2.getChildren(path, childWatcher);
+ } else if (i % 3 == 1) {
+ zk2.exists(path + "/foo", childWatcher);
+ } else if (i % 3 == 2) {
+ zk2.getData(path, childWatcher, null);
+ }
+
+ i++;
+ }
+
+ stopServer();
+ watcher.waitForDisconnected(30000);
+ startServer();
+ watcher.waitForConnected(30000);
+
+ // Trigger the watches and ensure they properly propagate to the client
+ i = 0;
+ for (String path : paths) {
+ if (i % 3 == 0) {
+ zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(e);
+ Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+ Assert.assertEquals(path, e.getPath());
+ } else if (i % 3 == 1) {
+ zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(e);
+ Assert.assertEquals(EventType.NodeCreated, e.getType());
+ Assert.assertEquals(path + "/foo", e.getPath());
+ } else if (i % 3 == 2) {
+ zk1.setData(path, new byte[]{1, 2, 3}, -1);
+
+ WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(e);
+ Assert.assertEquals(EventType.NodeDataChanged, e.getType());
+ Assert.assertEquals(path, e.getPath());
+ }
+
+ i++;
+ }
+ }
+
}