You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/08 18:40:59 UTC

[curator] 01/01: CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 01706e06f899c17c2ce8f1a10de7bc059c25c200
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Nov 8 10:53:38 2019 -0500

    CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
---
 .../framework/recipes/cache/CuratorCacheImpl.java  |  15 +-
 .../framework/recipes/cache/TestCuratorCache.java  |  45 ------
 .../recipes/cache/TestCuratorCacheEdges.java       | 153 +++++++++++++++++++++
 3 files changed, 164 insertions(+), 49 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index ee95570..6de4d06 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collections;
@@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void nodeChildrenChanged(String fromPath)
+    private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat)
     {
         if ( (state.get() != State.STARTED) || !recursive )
         {
             return;
         }
 
+        if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) )
+        {
+            return; // children haven't changed
+        }
+
         try
         {
             BackgroundCallback callback = (__, event) -> {
@@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache
             BackgroundCallback callback = (__, event) -> {
                 if ( event.getResultCode() == OK.intValue() )
                 {
-                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
-                    nodeChildrenChanged(event.getPath());
+                    Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat());
                 }
                 else if ( event.getResultCode() == NONODE.intValue() )
                 {
@@ -233,7 +239,7 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void putStorage(ChildData data)
+    private Optional<ChildData> putStorage(ChildData data)
     {
         Optional<ChildData> previousData = storage.put(data);
         if ( previousData.isPresent() )
@@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache
         {
             callListeners(l -> l.event(NODE_CREATED, null, data));
         }
+        return previousData;
     }
 
     private void removeStorage(String path)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 8560f87..2d6fb0d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu
 public class TestCuratorCache extends CuratorTestBase
 {
     @Test
-    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
-    {
-        try (TestingCluster cluster = new TestingCluster(3))
-        {
-            cluster.start();
-
-            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
-            {
-                client.start();
-                client.create().creatingParentsIfNeeded().forPath("/test");
-
-                try (CuratorCache cache = CuratorCache.build(client, "/test"))
-                {
-                    cache.start();
-
-                    CountDownLatch reconnectLatch = new CountDownLatch(1);
-                    client.getConnectionStateListenable().addListener((__, newState) -> {
-                        if ( newState == ConnectionState.RECONNECTED )
-                        {
-                            reconnectLatch.countDown();
-                        }
-                    });
-                    CountDownLatch latch = new CountDownLatch(3);
-                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
-
-                    client.create().forPath("/test/one");
-                    client.create().forPath("/test/two");
-                    client.create().forPath("/test/three");
-
-                    Assert.assertTrue(timing.awaitLatch(latch));
-
-                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
-                    cluster.killServer(connectionInstance);
-
-                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
-
-                    timing.sleepABit();
-
-                    Assert.assertEquals(cache.storage().stream().count(), 4);
-                }
-            }
-        }
-    }
-
-    @Test
     public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
     {
         CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
new file mode 100644
index 0000000..f20f775
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEdges extends CuratorTestBase
+{
+    @Test
+    public void testReconnectConsistency() throws Exception
+    {
+        final byte[] first = "one".getBytes();
+        final byte[] second = "two".getBytes();
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/root", first);
+            client.create().forPath("/root/1", first);
+            client.create().forPath("/root/2", first);
+            client.create().forPath("/root/1/11", first);
+            client.create().forPath("/root/1/12", first);
+            client.create().forPath("/root/1/13", first);
+            client.create().forPath("/root/2/21", first);
+            client.create().forPath("/root/2/22", first);
+
+            CuratorCacheStorage storage = CuratorCacheStorage.standard();
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            // we now have a storage loaded with the initial nodes created
+
+            // simulate nodes changing during a partition
+
+            client.delete().forPath("/root/2/21");
+            client.delete().forPath("/root/2/22");
+            client.delete().forPath("/root/2");
+
+            client.setData().forPath("/root", second);
+            client.create().forPath("/root/1/11/111", second);
+            client.create().forPath("/root/1/11/111/1111", second);
+            client.create().forPath("/root/1/11/111/1112", second);
+            client.create().forPath("/root/1/13/131", second);
+            client.create().forPath("/root/1/13/132", second);
+            client.create().forPath("/root/1/13/132/1321", second);
+
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            Assert.assertEquals(storage.size(), 11);
+            Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second);
+        }
+    }
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+}