You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/12 14:52:41 UTC
[curator] 04/04: CURATOR-549 - optimization: if a node that the
cache knows about gets a change event,
there's no need to query for children if the Cversion hasn't changed
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 97f5d6e26f47b954bb69a79eb2a6a2838ed8e84b
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Nov 8 10:53:38 2019 -0500
CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
---
.../framework/recipes/cache/CuratorCacheImpl.java | 17 ++-
.../framework/recipes/watch/PersistentWatcher.java | 10 +-
.../src/site/confluence/index.confluence | 3 +-
.../site/confluence/persistent-watcher.confluence | 35 +++++
.../framework/recipes/cache/TestCuratorCache.java | 45 ------
.../recipes/cache/TestCuratorCacheEdges.java | 153 +++++++++++++++++++++
.../recipes/watch/TestPersistentWatcher.java | 1 +
7 files changed, 208 insertions(+), 56 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index ee95570..e6be71c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
@@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache
}
}
- private void nodeChildrenChanged(String fromPath)
+ private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat)
{
if ( (state.get() != State.STARTED) || !recursive )
{
return;
}
+ if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) )
+ {
+ return; // children haven't changed
+ }
+
try
{
BackgroundCallback callback = (__, event) -> {
@@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache
BackgroundCallback callback = (__, event) -> {
if ( event.getResultCode() == OK.intValue() )
{
- putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
- nodeChildrenChanged(event.getPath());
+ Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+ checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat());
}
else if ( event.getResultCode() == NONODE.intValue() )
{
@@ -233,12 +239,12 @@ class CuratorCacheImpl implements CuratorCache
}
}
- private void putStorage(ChildData data)
+ private Optional<ChildData> putStorage(ChildData data)
{
Optional<ChildData> previousData = storage.put(data);
if ( previousData.isPresent() )
{
- if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+ if ( previousData.get().getStat().getVersion() != data.getStat().getVersion() )
{
callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
}
@@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache
{
callListeners(l -> l.event(NODE_CREATED, null, data));
}
+ return previousData;
}
private void removeStorage(String path)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 87ecb6e..187343a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -114,7 +114,7 @@ public class PersistentWatcher implements Closeable
client.getConnectionStateListenable().removeListener(connectionStateListener);
try
{
- client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+ client.watchers().remove(watcher).guaranteed().inBackground().forPath(basePath);
}
catch ( Exception e )
{
@@ -140,7 +140,7 @@ public class PersistentWatcher implements Closeable
*
* @return listener container
*/
- public StandardListenerManager<Runnable> getResetListenable()
+ public Listenable<Runnable> getResetListenable()
{
return resetListeners;
}
@@ -150,13 +150,13 @@ public class PersistentWatcher implements Closeable
try
{
BackgroundCallback callback = (__, event) -> {
- if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
- reset();
+ resetListeners.forEach(Runnable::run);
}
else
{
- resetListeners.forEach(Runnable::run);
+ reset();
}
};
client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence
index d96b5ce..ab8dc53 100644
--- a/curator-recipes/src/site/confluence/index.confluence
+++ b/curator-recipes/src/site/confluence/index.confluence
@@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths".
|[[Node Cache|node-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
|[[Tree Cache|tree-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
-||Nodes||
+||Nodes/Watches||
+|[[Persistent Recursive Watcher|persistent-watcher.html]] \- A managed persistent recursive watcher. The watch will be managed such that it stays set through connection lapses, etc.|
|[[Persistent Node|persistent-node.html]] \- A node that attempts to stay present in ZooKeeper, even through connection and session interruptions.|
|[[Persistent TTL Node|persistent-ttl-node.html]] \- Useful when you need to create a TTL node but don't want to keep it alive manually by periodically setting data.|
|[Group Member|group-member.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.|
diff --git a/curator-recipes/src/site/confluence/persistent-watcher.confluence b/curator-recipes/src/site/confluence/persistent-watcher.confluence
new file mode 100644
index 0000000..4551669
--- /dev/null
+++ b/curator-recipes/src/site/confluence/persistent-watcher.confluence
@@ -0,0 +1,35 @@
+h1. Persistent Recursive Watcher
+
+*Note: * PersistentWatcher requires ZooKeeper 3.6\+.
+
+h2. Description
+A managed persistent persistent watcher. The watch will be managed such that it stays set through connection lapses, etc.
+
+h2. Participating Classes
+* PersistentWatcher
+
+h2. Usage
+h3. Creating a PersistentWatcher
+{code}
+public PersistentWatcher(CuratorFramework client,
+ String basePath,
+ boolean recursive)
+
+Parameters:
+client - the client
+basePath - path to set the watch on
+recursive - ZooKeeper persistent watches can optionally be recursive
+{code}
+
+h2. General Usage
+The instance must be started by calling {{start()}}. Call {{close()}} when you want to remove the watch.
+
+PersistentWatcher presents two listener types:
+
+* {{Listenable<Watcher> getListenable()}} \- Use this to add watchers. These will behave in the same manner that watchers added
+via {{ZooKeeper.addWatch()}} behave.
+* {{Listenable<Runnable> getResetListenable()}} \- The Runnables added with this get called once the Persistent Watcher has been successfully set
+(or reset after a connection partition).
+
+h2. Error Handling
+PersistentWatcher instances internally monitor connection losses, etc. automatically resetting on reconnection.
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 8560f87..2d6fb0d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu
public class TestCuratorCache extends CuratorTestBase
{
@Test
- public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster
- {
- try (TestingCluster cluster = new TestingCluster(3))
- {
- cluster.start();
-
- try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/test");
-
- try (CuratorCache cache = CuratorCache.build(client, "/test"))
- {
- cache.start();
-
- CountDownLatch reconnectLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener((__, newState) -> {
- if ( newState == ConnectionState.RECONNECTED )
- {
- reconnectLatch.countDown();
- }
- });
- CountDownLatch latch = new CountDownLatch(3);
- cache.listenable().addListener((__, ___, ____) -> latch.countDown());
-
- client.create().forPath("/test/one");
- client.create().forPath("/test/two");
- client.create().forPath("/test/three");
-
- Assert.assertTrue(timing.awaitLatch(latch));
-
- InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
- cluster.killServer(connectionInstance);
-
- Assert.assertTrue(timing.awaitLatch(reconnectLatch));
-
- timing.sleepABit();
-
- Assert.assertEquals(cache.storage().stream().count(), 4);
- }
- }
- }
- }
-
- @Test
public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
{
CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
new file mode 100644
index 0000000..f20f775
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEdges extends CuratorTestBase
+{
+ @Test
+ public void testReconnectConsistency() throws Exception
+ {
+ final byte[] first = "one".getBytes();
+ final byte[] second = "two".getBytes();
+
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().forPath("/root", first);
+ client.create().forPath("/root/1", first);
+ client.create().forPath("/root/2", first);
+ client.create().forPath("/root/1/11", first);
+ client.create().forPath("/root/1/12", first);
+ client.create().forPath("/root/1/13", first);
+ client.create().forPath("/root/2/21", first);
+ client.create().forPath("/root/2/22", first);
+
+ CuratorCacheStorage storage = CuratorCacheStorage.standard();
+ try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+
+ // we now have a storage loaded with the initial nodes created
+
+ // simulate nodes changing during a partition
+
+ client.delete().forPath("/root/2/21");
+ client.delete().forPath("/root/2/22");
+ client.delete().forPath("/root/2");
+
+ client.setData().forPath("/root", second);
+ client.create().forPath("/root/1/11/111", second);
+ client.create().forPath("/root/1/11/111/1111", second);
+ client.create().forPath("/root/1/11/111/1112", second);
+ client.create().forPath("/root/1/13/131", second);
+ client.create().forPath("/root/1/13/132", second);
+ client.create().forPath("/root/1/13/132/1321", second);
+
+ try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+
+ Assert.assertEquals(storage.size(), 11);
+ Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second);
+ }
+ }
+
+ @Test
+ public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster
+ {
+ try (TestingCluster cluster = new TestingCluster(3))
+ {
+ cluster.start();
+
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test");
+
+ try (CuratorCache cache = CuratorCache.build(client, "/test"))
+ {
+ cache.start();
+
+ CountDownLatch reconnectLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener((__, newState) -> {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectLatch.countDown();
+ }
+ });
+ CountDownLatch latch = new CountDownLatch(3);
+ cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+ client.create().forPath("/test/one");
+ client.create().forPath("/test/two");
+ client.create().forPath("/test/three");
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+ cluster.killServer(connectionInstance);
+
+ Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+ timing.sleepABit();
+
+ Assert.assertEquals(cache.storage().stream().count(), 4);
+ }
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index 534c365..1cf7eb0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+@Test(groups = CuratorTestBase.zk36Group)
public class TestPersistentWatcher extends CuratorTestBase
{
@Test