You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/04/01 03:53:51 UTC
[curator] 01/01: CURATOR-549
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-tree-cache-iterator
in repository https://gitbox.apache.org/repos/asf/curator.git
commit c19607934110b7ba07a6640f8bbed08f77d5eeef
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Mar 29 15:33:13 2020 -0500
CURATOR-549
The next phase of this issue will implement a bridge cache that bridges TreeCache for pre 3.6 SK and CuratorCache for ZK 3.6+. That bridge will need this TreeCache iterator.
---
.../curator/framework/recipes/cache/TreeCache.java | 63 ++++++-
.../framework/recipes/cache/TreeCacheEvent.java | 20 ++
.../framework/recipes/cache/TreeCacheIterator.java | 101 +++++++++++
.../framework/recipes/cache/BaseTestTreeCache.java | 10 +
.../cache/TestTreeCacheIteratorAndSize.java | 201 +++++++++++++++++++++
.../recipes/cache/TestTreeCacheRandomTree.java | 6 +-
6 files changed, 393 insertions(+), 8 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 3bf804c..13b9c59 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -217,7 +218,7 @@ public class TreeCache implements Closeable
private static final ChildData DEAD = new ChildData("/", null, null);
- private static boolean isLive(ChildData cd)
+ static boolean isLive(ChildData cd)
{
return cd != null && cd != DEAD;
}
@@ -226,7 +227,7 @@ public class TreeCache implements Closeable
private static final AtomicReferenceFieldUpdater<TreeNode, ConcurrentMap<String, TreeNode>> childrenUpdater = (AtomicReferenceFieldUpdater)AtomicReferenceFieldUpdater.newUpdater(TreeNode.class, ConcurrentMap.class, "children");
- private final class TreeNode implements Watcher, BackgroundCallback
+ final class TreeNode implements Watcher, BackgroundCallback
{
volatile ChildData childData;
final TreeNode parent;
@@ -343,7 +344,7 @@ public class TreeCache implements Closeable
if ( isLive(oldChildData) )
{
- publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData);
+ publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData, null);
}
if ( parent == null )
@@ -482,7 +483,14 @@ public class TreeCache implements Closeable
}
if ( childDataUpdater.compareAndSet(this, oldChildData, toUpdate) )
{
- publishEvent(isLive(oldChildData) ? TreeCacheEvent.Type.NODE_UPDATED : TreeCacheEvent.Type.NODE_ADDED, toPublish);
+ if ( isLive(oldChildData) )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_UPDATED, toPublish, oldChildData);
+ }
+ else
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_ADDED, toPublish, null);
+ }
break;
}
}
@@ -750,6 +758,49 @@ public class TreeCache implements Closeable
return isLive(result) ? result : null;
}
+ /**
+ * Return an iterator over all nodes in the cache. There are no
+ * guarantees of accuracy; this is merely the most recent view of the data.
+ *
+ * @return a possibly-empty iterator of nodes in the cache
+ */
+ public Iterator<ChildData> iterator()
+ {
+ return new TreeCacheIterator(root);
+ }
+
+ /**
+ * Return the number of nodes in the cache. There are no
+ * guarantees of accuracy; this is merely the most recent view of the data.
+ *
+ * @return size
+ */
+ public int size()
+ {
+ return size(root);
+ }
+
+ private int size(TreeNode node)
+ {
+ int size;
+ if ( isLive(node.childData) )
+ {
+ size = 1;
+ if ( node.children != null )
+ {
+ for ( TreeNode child : node.children.values() )
+ {
+ size += size(child);
+ }
+ }
+ }
+ else
+ {
+ size = 0;
+ }
+ return size;
+ }
+
private void callListeners(final TreeCacheEvent event)
{
listeners.forEach(listener ->
@@ -837,9 +888,9 @@ public class TreeCache implements Closeable
publishEvent(new TreeCacheEvent(type, null));
}
- private void publishEvent(TreeCacheEvent.Type type, ChildData data)
+ private void publishEvent(TreeCacheEvent.Type type, ChildData data, ChildData oldData)
{
- publishEvent(new TreeCacheEvent(type, data));
+ publishEvent(new TreeCacheEvent(type, data, oldData));
}
private void publishEvent(final TreeCacheEvent event)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
index b151037..012b280 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
@@ -26,6 +26,7 @@ public class TreeCacheEvent
{
private final Type type;
private final ChildData data;
+ private final ChildData oldData;
/**
* Type of change
@@ -113,8 +114,19 @@ public class TreeCacheEvent
*/
public TreeCacheEvent(Type type, ChildData data)
{
+ this(type, data, null);
+ }
+
+ /**
+ * @param type event type
+ * @param data event data or null
+ * @param oldData event oldData or null
+ */
+ public TreeCacheEvent(Type type, ChildData data, ChildData oldData)
+ {
this.type = type;
this.data = data;
+ this.oldData = oldData;
}
/**
@@ -133,6 +145,14 @@ public class TreeCacheEvent
return data;
}
+ /**
+ * @return the node's old data when the type is {@link org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type#NODE_UPDATED}
+ */
+ public ChildData getOldData()
+ {
+ return oldData;
+ }
+
@Override
public String toString()
{
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java
new file mode 100644
index 0000000..eed42dc
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+// depth first iterator over tree cache nodes
+class TreeCacheIterator implements Iterator<ChildData>
+{
+ private final LinkedList<Current> stack = new LinkedList<>();
+ private Current current;
+
+ private static class Current
+ {
+ final Iterator<TreeCache.TreeNode> iterator;
+ TreeCache.TreeNode node;
+
+ Current(Iterator<TreeCache.TreeNode> iterator)
+ {
+ this.iterator = iterator;
+ node = iterator.next();
+ }
+ }
+
+ TreeCacheIterator(TreeCache.TreeNode root)
+ {
+ current = new Current(Iterators.forArray(root));
+ stack.push(current);
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return (current != null) && TreeCache.isLive(current.node.childData);
+ }
+
+ @Override
+ public ChildData next()
+ {
+ if ( current == null )
+ {
+ throw new NoSuchElementException();
+ }
+
+ ChildData result = current.node.childData; // result of next iteration is current node's data
+
+ // set the next node for the next iteration (or note completion)
+
+ do
+ {
+ setNext();
+ } while ( (current != null) && !TreeCache.isLive(current.node.childData) );
+
+ return result;
+ }
+
+ private void setNext()
+ {
+ if ( current.node.children != null )
+ {
+ stack.push(current);
+ current = new Current(current.node.children.values().iterator());
+ }
+ else while ( true )
+ {
+ if ( current.iterator.hasNext() )
+ {
+ current.node = current.iterator.next();
+ break;
+ }
+ else if ( stack.size() > 0 )
+ {
+ current = stack.pop();
+ }
+ else
+ {
+ current = null; // done
+ break;
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index 246704f..dfeb2ff 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -193,6 +193,16 @@ public class BaseTestTreeCache extends BaseClassForTests
{
Assert.assertEquals(event.getData().getData(), expectedData, message);
}
+
+ if ( event.getType() == TreeCacheEvent.Type.NODE_UPDATED)
+ {
+ Assert.assertNotNull(event.getOldData());
+ }
+ else
+ {
+ Assert.assertNull(event.getOldData());
+ }
+
return event;
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java
new file mode 100644
index 0000000..2d8dbb3
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestTreeCacheIteratorAndSize extends CuratorTestBase
+{
+ @Test
+ public void testBasic() throws Exception
+ {
+ final String[] nodes = {
+ "/base/test",
+ "/base/test/3",
+ "/base/test/3/0",
+ "/base/test/3/0/0",
+ "/base/test/3/0/1",
+ "/base/test/3/1",
+ "/base/test/3/1/0",
+ "/base/test/3/1/1",
+ "/base/test/3/2",
+ "/base/test/3/2/0",
+ "/base/test/3/2/1",
+ "/base/test/3/2/3",
+ "/base/test/3/3",
+ "/base/test/3/3/1",
+ "/base/test/3/3/3"
+ };
+
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ String basePath = "/base/test";
+ try (TreeCache treeCache = new TreeCache(client, basePath) )
+ {
+ treeCache.start();
+
+ for ( String node : nodes )
+ {
+ client.create().creatingParentsIfNeeded().forPath(node, node.getBytes());
+ }
+
+ timing.sleepABit(); // let the cache settle
+
+ Iterator<ChildData> iterator = treeCache.iterator();
+ Map<String, byte[]> iteratorValues = new HashMap<>();
+ while ( iterator.hasNext() )
+ {
+ ChildData next = iterator.next();
+ iteratorValues.put(next.getPath(), next.getData());
+ }
+
+ Assert.assertEquals(iteratorValues.size(), nodes.length);
+ for ( String node : nodes )
+ {
+ Assert.assertEquals(iteratorValues.get(node), node.getBytes());
+ }
+
+ Assert.assertEquals(treeCache.size(), nodes.length);
+ }
+ }
+ }
+
+ @Test
+ public void testIteratorWithRandomGraph() throws Exception
+ {
+ Map<String, String> pathAndData = new HashMap<>();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int nodeQty = random.nextInt(100, 200);
+ int maxPerRow = random.nextInt(1, 10);
+ int maxDepth = random.nextInt(3, 5);
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ String basePath = "/base/test";
+ try (TreeCache treeCache = new TreeCache(client, basePath) )
+ {
+ treeCache.start();
+
+ client.create().creatingParentsIfNeeded().forPath(basePath, "0".getBytes());
+ pathAndData.put(basePath, "0");
+
+ while ( nodeQty-- > 0 )
+ {
+ int thisDepth = random.nextInt(1, maxDepth + 1);
+ StringBuilder path = new StringBuilder(basePath);
+ for ( int i = 0; i < thisDepth; ++i )
+ {
+ path.append("/").append(random.nextInt(maxPerRow));
+ long value = random.nextLong();
+ pathAndData.put(path.toString(), Long.toString(value));
+ client.create().orSetData().forPath(path.toString(), Long.toString(value).getBytes());
+ }
+ }
+
+ timing.sleepABit(); // let the cache settle
+
+ Assert.assertEquals(treeCache.size(), pathAndData.size());
+
+ // at this point we have a cached graph of random nodes with random values
+ Iterator<ChildData> iterator = treeCache.iterator();
+ while ( iterator.hasNext() )
+ {
+ ChildData next = iterator.next();
+ Assert.assertTrue(pathAndData.containsKey(next.getPath()));
+ Assert.assertEquals(pathAndData.get(next.getPath()).getBytes(), next.getData());
+ pathAndData.remove(next.getPath());
+ }
+
+ Assert.assertEquals(pathAndData.size(), 0); // above loop should have removed all nodes
+ }
+ }
+ }
+
+ @Test
+ public void testEmptyTree() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)))
+ {
+ client.start();
+
+ try (TreeCache treeCache = new TreeCache(client, "/base/test"))
+ {
+ treeCache.start();
+
+ Iterator<ChildData> iterator = treeCache.iterator();
+ Assert.assertFalse(iterator.hasNext());
+ Assert.assertEquals(treeCache.size(), 0);
+ }
+ }
+ }
+
+ @Test
+ public void testWithDeletedNodes() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)))
+ {
+ client.start();
+
+ try (TreeCache treeCache = new TreeCache(client, "/foo"))
+ {
+ treeCache.start();
+
+ client.create().forPath("/foo");
+ client.create().forPath("/foo/a1");
+ client.create().forPath("/foo/a2");
+ client.create().forPath("/foo/a2/a2.1");
+ client.create().forPath("/foo/a2/a2.2");
+ client.create().forPath("/foo/a3");
+ client.create().forPath("/foo/a3/a3.1");
+ client.create().forPath("/foo/a3/a3.2");
+
+ client.delete().forPath("/foo/a2/a2.2");
+ client.delete().forPath("/foo/a3/a3.1");
+
+ timing.sleepABit(); // let the cache settle
+
+ Iterator<ChildData> iterator = treeCache.iterator();
+ Set<String> paths = new HashSet<>();
+ while ( iterator.hasNext() )
+ {
+ ChildData next = iterator.next();
+ paths.add(next.getPath());
+ }
+
+ Assert.assertEquals(paths, Sets.newHashSet("/foo", "/foo/a1", "/foo/a2", "/foo/a2/a2.1", "/foo/a3", "/foo/a3/a3.2"));
+ Assert.assertEquals(treeCache.size(), 6);
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
index 96ce75c..e78bb9e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
@@ -29,6 +29,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import static org.testng.Assert.assertNotNull;
+
public class TestTreeCacheRandomTree extends BaseTestTreeCache
{
/**
@@ -209,7 +211,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache
{
String path = expectedNode.fullPath;
Map<String, ChildData> cacheChildren = cache.getCurrentChildren(path);
- Assert.assertNotNull(cacheChildren, path);
+ assertNotNull(cacheChildren, path);
if (withDepth && depth == TEST_DEPTH) {
return;
@@ -233,7 +235,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache
private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode)
{
String path = expectedNode.fullPath;
- Assert.assertNotNull(actualChild, path);
+ assertNotNull(actualChild, path);
Assert.assertEquals(actualChild.getData(), expectedNode.data, path);
}
}