You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/08 18:40:58 UTC

[curator] branch CURATOR-549-zk36-persistent-watcher-recipes updated (f8a2dc4 -> 01706e0)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git.


    omit f8a2dc4  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
     new 01706e0  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f8a2dc4)
            \
             N -- N -- N   refs/heads/CURATOR-549-zk36-persistent-watcher-recipes (01706e0)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../framework/recipes/cache/TestCuratorCache.java  |  45 ------
 .../recipes/cache/TestCuratorCacheEdges.java       | 153 +++++++++++++++++++++
 2 files changed, 153 insertions(+), 45 deletions(-)
 create mode 100644 curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java


[curator] 01/01: CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 01706e06f899c17c2ce8f1a10de7bc059c25c200
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Nov 8 10:53:38 2019 -0500

    CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
---
 .../framework/recipes/cache/CuratorCacheImpl.java  |  15 +-
 .../framework/recipes/cache/TestCuratorCache.java  |  45 ------
 .../recipes/cache/TestCuratorCacheEdges.java       | 153 +++++++++++++++++++++
 3 files changed, 164 insertions(+), 49 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index ee95570..6de4d06 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collections;
@@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void nodeChildrenChanged(String fromPath)
+    private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat)
     {
         if ( (state.get() != State.STARTED) || !recursive )
         {
             return;
         }
 
+        if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) )
+        {
+            return; // children haven't changed
+        }
+
         try
         {
             BackgroundCallback callback = (__, event) -> {
@@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache
             BackgroundCallback callback = (__, event) -> {
                 if ( event.getResultCode() == OK.intValue() )
                 {
-                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
-                    nodeChildrenChanged(event.getPath());
+                    Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat());
                 }
                 else if ( event.getResultCode() == NONODE.intValue() )
                 {
@@ -233,7 +239,7 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void putStorage(ChildData data)
+    private Optional<ChildData> putStorage(ChildData data)
     {
         Optional<ChildData> previousData = storage.put(data);
         if ( previousData.isPresent() )
@@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache
         {
             callListeners(l -> l.event(NODE_CREATED, null, data));
         }
+        return previousData;
     }
 
     private void removeStorage(String path)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 8560f87..2d6fb0d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu
 public class TestCuratorCache extends CuratorTestBase
 {
     @Test
-    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
-    {
-        try (TestingCluster cluster = new TestingCluster(3))
-        {
-            cluster.start();
-
-            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
-            {
-                client.start();
-                client.create().creatingParentsIfNeeded().forPath("/test");
-
-                try (CuratorCache cache = CuratorCache.build(client, "/test"))
-                {
-                    cache.start();
-
-                    CountDownLatch reconnectLatch = new CountDownLatch(1);
-                    client.getConnectionStateListenable().addListener((__, newState) -> {
-                        if ( newState == ConnectionState.RECONNECTED )
-                        {
-                            reconnectLatch.countDown();
-                        }
-                    });
-                    CountDownLatch latch = new CountDownLatch(3);
-                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
-
-                    client.create().forPath("/test/one");
-                    client.create().forPath("/test/two");
-                    client.create().forPath("/test/three");
-
-                    Assert.assertTrue(timing.awaitLatch(latch));
-
-                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
-                    cluster.killServer(connectionInstance);
-
-                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
-
-                    timing.sleepABit();
-
-                    Assert.assertEquals(cache.storage().stream().count(), 4);
-                }
-            }
-        }
-    }
-
-    @Test
     public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
     {
         CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
new file mode 100644
index 0000000..f20f775
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEdges extends CuratorTestBase
+{
+    @Test
+    public void testReconnectConsistency() throws Exception
+    {
+        final byte[] first = "one".getBytes();
+        final byte[] second = "two".getBytes();
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/root", first);
+            client.create().forPath("/root/1", first);
+            client.create().forPath("/root/2", first);
+            client.create().forPath("/root/1/11", first);
+            client.create().forPath("/root/1/12", first);
+            client.create().forPath("/root/1/13", first);
+            client.create().forPath("/root/2/21", first);
+            client.create().forPath("/root/2/22", first);
+
+            CuratorCacheStorage storage = CuratorCacheStorage.standard();
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            // we now have a storage loaded with the initial nodes created
+
+            // simulate nodes changing during a partition
+
+            client.delete().forPath("/root/2/21");
+            client.delete().forPath("/root/2/22");
+            client.delete().forPath("/root/2");
+
+            client.setData().forPath("/root", second);
+            client.create().forPath("/root/1/11/111", second);
+            client.create().forPath("/root/1/11/111/1111", second);
+            client.create().forPath("/root/1/11/111/1112", second);
+            client.create().forPath("/root/1/13/131", second);
+            client.create().forPath("/root/1/13/132", second);
+            client.create().forPath("/root/1/13/132/1321", second);
+
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            Assert.assertEquals(storage.size(), 11);
+            Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second);
+        }
+    }
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+}