You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/08 18:40:59 UTC
[curator] 01/01: CURATOR-549 - optimization: if a node that the
cache knows about gets a change event,
there's no need to query for children if the Cversion hasn't changed
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 01706e06f899c17c2ce8f1a10de7bc059c25c200
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Nov 8 10:53:38 2019 -0500
CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
---
.../framework/recipes/cache/CuratorCacheImpl.java | 15 +-
.../framework/recipes/cache/TestCuratorCache.java | 45 ------
.../recipes/cache/TestCuratorCacheEdges.java | 153 +++++++++++++++++++++
3 files changed, 164 insertions(+), 49 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index ee95570..6de4d06 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
@@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache
}
}
- private void nodeChildrenChanged(String fromPath)
+ private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat)
{
if ( (state.get() != State.STARTED) || !recursive )
{
return;
}
+ if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) )
+ {
+ return; // children haven't changed
+ }
+
try
{
BackgroundCallback callback = (__, event) -> {
@@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache
BackgroundCallback callback = (__, event) -> {
if ( event.getResultCode() == OK.intValue() )
{
- putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
- nodeChildrenChanged(event.getPath());
+ Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+ checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat());
}
else if ( event.getResultCode() == NONODE.intValue() )
{
@@ -233,7 +239,7 @@ class CuratorCacheImpl implements CuratorCache
}
}
- private void putStorage(ChildData data)
+ private Optional<ChildData> putStorage(ChildData data)
{
Optional<ChildData> previousData = storage.put(data);
if ( previousData.isPresent() )
@@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache
{
callListeners(l -> l.event(NODE_CREATED, null, data));
}
+ return previousData;
}
private void removeStorage(String path)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 8560f87..2d6fb0d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu
public class TestCuratorCache extends CuratorTestBase
{
@Test
- public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster
- {
- try (TestingCluster cluster = new TestingCluster(3))
- {
- cluster.start();
-
- try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/test");
-
- try (CuratorCache cache = CuratorCache.build(client, "/test"))
- {
- cache.start();
-
- CountDownLatch reconnectLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener((__, newState) -> {
- if ( newState == ConnectionState.RECONNECTED )
- {
- reconnectLatch.countDown();
- }
- });
- CountDownLatch latch = new CountDownLatch(3);
- cache.listenable().addListener((__, ___, ____) -> latch.countDown());
-
- client.create().forPath("/test/one");
- client.create().forPath("/test/two");
- client.create().forPath("/test/three");
-
- Assert.assertTrue(timing.awaitLatch(latch));
-
- InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
- cluster.killServer(connectionInstance);
-
- Assert.assertTrue(timing.awaitLatch(reconnectLatch));
-
- timing.sleepABit();
-
- Assert.assertEquals(cache.storage().stream().count(), 4);
- }
- }
- }
- }
-
- @Test
public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
{
CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
new file mode 100644
index 0000000..f20f775
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEdges extends CuratorTestBase
+{
+ @Test
+ public void testReconnectConsistency() throws Exception
+ {
+ final byte[] first = "one".getBytes();
+ final byte[] second = "two".getBytes();
+
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().forPath("/root", first);
+ client.create().forPath("/root/1", first);
+ client.create().forPath("/root/2", first);
+ client.create().forPath("/root/1/11", first);
+ client.create().forPath("/root/1/12", first);
+ client.create().forPath("/root/1/13", first);
+ client.create().forPath("/root/2/21", first);
+ client.create().forPath("/root/2/22", first);
+
+ CuratorCacheStorage storage = CuratorCacheStorage.standard();
+ try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+
+ // we now have a storage loaded with the initial nodes created
+
+ // simulate nodes changing during a partition
+
+ client.delete().forPath("/root/2/21");
+ client.delete().forPath("/root/2/22");
+ client.delete().forPath("/root/2");
+
+ client.setData().forPath("/root", second);
+ client.create().forPath("/root/1/11/111", second);
+ client.create().forPath("/root/1/11/111/1111", second);
+ client.create().forPath("/root/1/11/111/1112", second);
+ client.create().forPath("/root/1/13/131", second);
+ client.create().forPath("/root/1/13/132", second);
+ client.create().forPath("/root/1/13/132/1321", second);
+
+ try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+
+ Assert.assertEquals(storage.size(), 11);
+ Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first);
+ Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second);
+ Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second);
+ }
+ }
+
+ @Test
+ public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster
+ {
+ try (TestingCluster cluster = new TestingCluster(3))
+ {
+ cluster.start();
+
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test");
+
+ try (CuratorCache cache = CuratorCache.build(client, "/test"))
+ {
+ cache.start();
+
+ CountDownLatch reconnectLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener((__, newState) -> {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectLatch.countDown();
+ }
+ });
+ CountDownLatch latch = new CountDownLatch(3);
+ cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+ client.create().forPath("/test/one");
+ client.create().forPath("/test/two");
+ client.create().forPath("/test/three");
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+ cluster.killServer(connectionInstance);
+
+ Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+ timing.sleepABit();
+
+ Assert.assertEquals(cache.storage().stream().count(), 4);
+ }
+ }
+ }
+ }
+}