You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/09 00:27:27 UTC

[curator] branch CURATOR-549-zk36-persistent-watcher-recipes updated (3d0f55b -> 0f3add2)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git.


    omit 3d0f55b  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
     new 0f3add2  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3d0f55b)
            \
             N -- N -- N   refs/heads/CURATOR-549-zk36-persistent-watcher-recipes (0f3add2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/site/confluence/index.confluence           |  3 +-
 .../site/confluence/persistent-watcher.confluence  | 35 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)
 create mode 100644 curator-recipes/src/site/confluence/persistent-watcher.confluence


[curator] 01/01: CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 0f3add29453a7073ddba8f347f6186ba5dc29a8a
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Nov 8 10:53:38 2019 -0500

    CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
---
 .../framework/recipes/cache/CuratorCacheImpl.java  |  17 ++-
 .../src/site/confluence/index.confluence           |   3 +-
 .../site/confluence/persistent-watcher.confluence  |  35 +++++
 .../framework/recipes/cache/TestCuratorCache.java  |  45 ------
 .../recipes/cache/TestCuratorCacheEdges.java       | 153 +++++++++++++++++++++
 5 files changed, 202 insertions(+), 51 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index ee95570..e6be71c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collections;
@@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void nodeChildrenChanged(String fromPath)
+    private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat)
     {
         if ( (state.get() != State.STARTED) || !recursive )
         {
             return;
         }
 
+        if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) )
+        {
+            return; // children haven't changed
+        }
+
         try
         {
             BackgroundCallback callback = (__, event) -> {
@@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache
             BackgroundCallback callback = (__, event) -> {
                 if ( event.getResultCode() == OK.intValue() )
                 {
-                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
-                    nodeChildrenChanged(event.getPath());
+                    Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat());
                 }
                 else if ( event.getResultCode() == NONODE.intValue() )
                 {
@@ -233,12 +239,12 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void putStorage(ChildData data)
+    private Optional<ChildData> putStorage(ChildData data)
     {
         Optional<ChildData> previousData = storage.put(data);
         if ( previousData.isPresent() )
         {
-            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            if ( previousData.get().getStat().getVersion() != data.getStat().getVersion() )
             {
                 callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
             }
@@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache
         {
             callListeners(l -> l.event(NODE_CREATED, null, data));
         }
+        return previousData;
     }
 
     private void removeStorage(String path)
diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence
index d96b5ce..ab8dc53 100644
--- a/curator-recipes/src/site/confluence/index.confluence
+++ b/curator-recipes/src/site/confluence/index.confluence
@@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths".
 |[[Node Cache|node-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
 |[[Tree Cache|tree-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
 
-||Nodes||
+||Nodes/Watches||
+|[[Persistent Recursive Watcher|persistent-watcher.html]] \- A managed persistent recursive watcher. The watch will be managed such that it stays set through connection lapses, etc.|
 |[[Persistent Node|persistent-node.html]] \- A node that attempts to stay present in ZooKeeper, even through connection and session interruptions.|
 |[[Persistent TTL Node|persistent-ttl-node.html]] \- Useful when you need to create a TTL node but don't want to keep it alive manually by periodically setting data.|
 |[Group Member|group-member.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.|
diff --git a/curator-recipes/src/site/confluence/persistent-watcher.confluence b/curator-recipes/src/site/confluence/persistent-watcher.confluence
new file mode 100644
index 0000000..39fb8b9
--- /dev/null
+++ b/curator-recipes/src/site/confluence/persistent-watcher.confluence
@@ -0,0 +1,35 @@
+h1. Persistent Recursive Watcher
+
+*Note: * PersistentWatcher requires ZooKeeper 3.6\+.
+
+h2. Description
+A managed persistent persistent watcher. The watch will be managed such that it stays set through connection lapses, etc.
+
+h2. Participating Classes
+* PersistentWatcher
+
+h2. Usage
+h3. Creating a PersistentWatcher
+{code}
+public PersistentWatcher(CuratorFramework client,
+                               String basePath,
+                               boolean recursive)
+
+Parameters:
+client - the client
+basePath - path to set the watch on
+recursive - ZooKeeper persistent watches can optionally be recursive
+{code}
+
+h2. General Usage
+The instance must be started by calling {{start()}}. Call {{close()}} when you want to remove the watch.
+
+PersistentWatcher presents two listener types:
+
+* {{Listenable<Watcher> getListenable()}} \- Use this to add standard ZooKeeper watchers. These will behave in the same manner that watchers added
+via {{ZooKeeper.addWatch()}} behave.
+* {{StandardListenerManager<Runnable> getResetListenable()}} \- The Runnables added with this get called once the Persistent Watcher has been successfully set
+(or reset after a connection partition).
+
+h2. Error Handling
+PersistentWatcher instances internally monitor connection losses, etc. automatically resetting on reconnection.
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 8560f87..2d6fb0d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu
 public class TestCuratorCache extends CuratorTestBase
 {
     @Test
-    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
-    {
-        try (TestingCluster cluster = new TestingCluster(3))
-        {
-            cluster.start();
-
-            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
-            {
-                client.start();
-                client.create().creatingParentsIfNeeded().forPath("/test");
-
-                try (CuratorCache cache = CuratorCache.build(client, "/test"))
-                {
-                    cache.start();
-
-                    CountDownLatch reconnectLatch = new CountDownLatch(1);
-                    client.getConnectionStateListenable().addListener((__, newState) -> {
-                        if ( newState == ConnectionState.RECONNECTED )
-                        {
-                            reconnectLatch.countDown();
-                        }
-                    });
-                    CountDownLatch latch = new CountDownLatch(3);
-                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
-
-                    client.create().forPath("/test/one");
-                    client.create().forPath("/test/two");
-                    client.create().forPath("/test/three");
-
-                    Assert.assertTrue(timing.awaitLatch(latch));
-
-                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
-                    cluster.killServer(connectionInstance);
-
-                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
-
-                    timing.sleepABit();
-
-                    Assert.assertEquals(cache.storage().stream().count(), 4);
-                }
-            }
-        }
-    }
-
-    @Test
     public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
     {
         CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
new file mode 100644
index 0000000..f20f775
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEdges extends CuratorTestBase
+{
+    @Test
+    public void testReconnectConsistency() throws Exception
+    {
+        final byte[] first = "one".getBytes();
+        final byte[] second = "two".getBytes();
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/root", first);
+            client.create().forPath("/root/1", first);
+            client.create().forPath("/root/2", first);
+            client.create().forPath("/root/1/11", first);
+            client.create().forPath("/root/1/12", first);
+            client.create().forPath("/root/1/13", first);
+            client.create().forPath("/root/2/21", first);
+            client.create().forPath("/root/2/22", first);
+
+            CuratorCacheStorage storage = CuratorCacheStorage.standard();
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            // we now have a storage loaded with the initial nodes created
+
+            // simulate nodes changing during a partition
+
+            client.delete().forPath("/root/2/21");
+            client.delete().forPath("/root/2/22");
+            client.delete().forPath("/root/2");
+
+            client.setData().forPath("/root", second);
+            client.create().forPath("/root/1/11/111", second);
+            client.create().forPath("/root/1/11/111/1111", second);
+            client.create().forPath("/root/1/11/111/1112", second);
+            client.create().forPath("/root/1/13/131", second);
+            client.create().forPath("/root/1/13/132", second);
+            client.create().forPath("/root/1/13/132/1321", second);
+
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            Assert.assertEquals(storage.size(), 11);
+            Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second);
+        }
+    }
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+}