You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/10 14:22:52 UTC

[curator] 03/03: CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 9eb0f6fe773189747620eb87f55d2709be4d82fc
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Nov 8 10:53:38 2019 -0500

    CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
---
 .../framework/recipes/cache/CuratorCacheImpl.java  |  17 ++-
 .../framework/recipes/watch/PersistentWatcher.java |  10 +-
 .../src/site/confluence/index.confluence           |   3 +-
 .../site/confluence/persistent-watcher.confluence  |  35 +++++
 .../framework/recipes/cache/TestCuratorCache.java  |  45 ------
 .../recipes/cache/TestCuratorCacheEdges.java       | 153 +++++++++++++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |   1 +
 7 files changed, 208 insertions(+), 56 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index ee95570..e6be71c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collections;
@@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void nodeChildrenChanged(String fromPath)
+    private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat)
     {
         if ( (state.get() != State.STARTED) || !recursive )
         {
             return;
         }
 
+        if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) )
+        {
+            return; // children haven't changed
+        }
+
         try
         {
             BackgroundCallback callback = (__, event) -> {
@@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache
             BackgroundCallback callback = (__, event) -> {
                 if ( event.getResultCode() == OK.intValue() )
                 {
-                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
-                    nodeChildrenChanged(event.getPath());
+                    Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat());
                 }
                 else if ( event.getResultCode() == NONODE.intValue() )
                 {
@@ -233,12 +239,12 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void putStorage(ChildData data)
+    private Optional<ChildData> putStorage(ChildData data)
     {
         Optional<ChildData> previousData = storage.put(data);
         if ( previousData.isPresent() )
         {
-            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            if ( previousData.get().getStat().getVersion() != data.getStat().getVersion() )
             {
                 callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
             }
@@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache
         {
             callListeners(l -> l.event(NODE_CREATED, null, data));
         }
+        return previousData;
     }
 
     private void removeStorage(String path)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 87ecb6e..187343a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -114,7 +114,7 @@ public class PersistentWatcher implements Closeable
             client.getConnectionStateListenable().removeListener(connectionStateListener);
             try
             {
-                client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+                client.watchers().remove(watcher).guaranteed().inBackground().forPath(basePath);
             }
             catch ( Exception e )
             {
@@ -140,7 +140,7 @@ public class PersistentWatcher implements Closeable
      *
      * @return listener container
      */
-    public StandardListenerManager<Runnable> getResetListenable()
+    public Listenable<Runnable> getResetListenable()
     {
         return resetListeners;
     }
@@ -150,13 +150,13 @@ public class PersistentWatcher implements Closeable
         try
         {
             BackgroundCallback callback = (__, event) -> {
-                if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
-                    reset();
+                    resetListeners.forEach(Runnable::run);
                 }
                 else
                 {
-                    resetListeners.forEach(Runnable::run);
+                    reset();
                 }
             };
             client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence
index d96b5ce..ab8dc53 100644
--- a/curator-recipes/src/site/confluence/index.confluence
+++ b/curator-recipes/src/site/confluence/index.confluence
@@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths".
 |[[Node Cache|node-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
 |[[Tree Cache|tree-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
 
-||Nodes||
+||Nodes/Watches||
+|[[Persistent Recursive Watcher|persistent-watcher.html]] \- A managed persistent recursive watcher. The watch will be managed such that it stays set through connection lapses, etc.|
 |[[Persistent Node|persistent-node.html]] \- A node that attempts to stay present in ZooKeeper, even through connection and session interruptions.|
 |[[Persistent TTL Node|persistent-ttl-node.html]] \- Useful when you need to create a TTL node but don't want to keep it alive manually by periodically setting data.|
 |[Group Member|group-member.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.|
diff --git a/curator-recipes/src/site/confluence/persistent-watcher.confluence b/curator-recipes/src/site/confluence/persistent-watcher.confluence
new file mode 100644
index 0000000..4551669
--- /dev/null
+++ b/curator-recipes/src/site/confluence/persistent-watcher.confluence
@@ -0,0 +1,35 @@
+h1. Persistent Recursive Watcher
+
+*Note: * PersistentWatcher requires ZooKeeper 3.6\+.
+
+h2. Description
+A managed persistent persistent watcher. The watch will be managed such that it stays set through connection lapses, etc.
+
+h2. Participating Classes
+* PersistentWatcher
+
+h2. Usage
+h3. Creating a PersistentWatcher
+{code}
+public PersistentWatcher(CuratorFramework client,
+                               String basePath,
+                               boolean recursive)
+
+Parameters:
+client - the client
+basePath - path to set the watch on
+recursive - ZooKeeper persistent watches can optionally be recursive
+{code}
+
+h2. General Usage
+The instance must be started by calling {{start()}}. Call {{close()}} when you want to remove the watch.
+
+PersistentWatcher presents two listener types:
+
+* {{Listenable<Watcher> getListenable()}} \- Use this to add watchers. These will behave in the same manner that watchers added
+via {{ZooKeeper.addWatch()}} behave.
+* {{Listenable<Runnable> getResetListenable()}} \- The Runnables added with this get called once the Persistent Watcher has been successfully set
+(or reset after a connection partition).
+
+h2. Error Handling
+PersistentWatcher instances internally monitor connection losses, etc. automatically resetting on reconnection.
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 8560f87..2d6fb0d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu
 public class TestCuratorCache extends CuratorTestBase
 {
     @Test
-    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
-    {
-        try (TestingCluster cluster = new TestingCluster(3))
-        {
-            cluster.start();
-
-            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
-            {
-                client.start();
-                client.create().creatingParentsIfNeeded().forPath("/test");
-
-                try (CuratorCache cache = CuratorCache.build(client, "/test"))
-                {
-                    cache.start();
-
-                    CountDownLatch reconnectLatch = new CountDownLatch(1);
-                    client.getConnectionStateListenable().addListener((__, newState) -> {
-                        if ( newState == ConnectionState.RECONNECTED )
-                        {
-                            reconnectLatch.countDown();
-                        }
-                    });
-                    CountDownLatch latch = new CountDownLatch(3);
-                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
-
-                    client.create().forPath("/test/one");
-                    client.create().forPath("/test/two");
-                    client.create().forPath("/test/three");
-
-                    Assert.assertTrue(timing.awaitLatch(latch));
-
-                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
-                    cluster.killServer(connectionInstance);
-
-                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
-
-                    timing.sleepABit();
-
-                    Assert.assertEquals(cache.storage().stream().count(), 4);
-                }
-            }
-        }
-    }
-
-    @Test
     public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
     {
         CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
new file mode 100644
index 0000000..f20f775
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEdges extends CuratorTestBase
+{
+    @Test
+    public void testReconnectConsistency() throws Exception
+    {
+        final byte[] first = "one".getBytes();
+        final byte[] second = "two".getBytes();
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/root", first);
+            client.create().forPath("/root/1", first);
+            client.create().forPath("/root/2", first);
+            client.create().forPath("/root/1/11", first);
+            client.create().forPath("/root/1/12", first);
+            client.create().forPath("/root/1/13", first);
+            client.create().forPath("/root/2/21", first);
+            client.create().forPath("/root/2/22", first);
+
+            CuratorCacheStorage storage = CuratorCacheStorage.standard();
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            // we now have a storage loaded with the initial nodes created
+
+            // simulate nodes changing during a partition
+
+            client.delete().forPath("/root/2/21");
+            client.delete().forPath("/root/2/22");
+            client.delete().forPath("/root/2");
+
+            client.setData().forPath("/root", second);
+            client.create().forPath("/root/1/11/111", second);
+            client.create().forPath("/root/1/11/111/1111", second);
+            client.create().forPath("/root/1/11/111/1112", second);
+            client.create().forPath("/root/1/13/131", second);
+            client.create().forPath("/root/1/13/132", second);
+            client.create().forPath("/root/1/13/132/1321", second);
+
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            Assert.assertEquals(storage.size(), 11);
+            Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second);
+        }
+    }
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index 534c365..1cf7eb0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+@Test(groups = CuratorTestBase.zk36Group)
 public class TestPersistentWatcher extends CuratorTestBase
 {
     @Test