You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/04/09 03:17:20 UTC

[curator] branch master updated: CURATOR-549

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 844c0ad  CURATOR-549
844c0ad is described below

commit 844c0ad36340b695b2784489c078cfd78522143c
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Mar 29 15:33:13 2020 -0500

    CURATOR-549
    
    The next phase of this issue will implement a bridge cache that bridges TreeCache for pre 3.6 SK and CuratorCache for ZK 3.6+. That bridge will need this TreeCache iterator.
---
 .../curator/framework/recipes/cache/TreeCache.java |  63 ++++++-
 .../framework/recipes/cache/TreeCacheEvent.java    |  20 ++
 .../framework/recipes/cache/TreeCacheIterator.java | 101 +++++++++++
 .../framework/recipes/cache/BaseTestTreeCache.java |  10 +
 .../cache/TestTreeCacheIteratorAndSize.java        | 201 +++++++++++++++++++++
 .../recipes/cache/TestTreeCacheRandomTree.java     |   6 +-
 6 files changed, 393 insertions(+), 8 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 3bf804c..13b9c59 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -217,7 +218,7 @@ public class TreeCache implements Closeable
 
     private static final ChildData DEAD = new ChildData("/", null, null);
 
-    private static boolean isLive(ChildData cd)
+    static boolean isLive(ChildData cd)
     {
         return cd != null && cd != DEAD;
     }
@@ -226,7 +227,7 @@ public class TreeCache implements Closeable
 
     private static final AtomicReferenceFieldUpdater<TreeNode, ConcurrentMap<String, TreeNode>> childrenUpdater = (AtomicReferenceFieldUpdater)AtomicReferenceFieldUpdater.newUpdater(TreeNode.class, ConcurrentMap.class, "children");
 
-    private final class TreeNode implements Watcher, BackgroundCallback
+    final class TreeNode implements Watcher, BackgroundCallback
     {
         volatile ChildData childData;
         final TreeNode parent;
@@ -343,7 +344,7 @@ public class TreeCache implements Closeable
 
             if ( isLive(oldChildData) )
             {
-                publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData);
+                publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData, null);
             }
 
             if ( parent == null )
@@ -482,7 +483,14 @@ public class TreeCache implements Closeable
                         }
                         if ( childDataUpdater.compareAndSet(this, oldChildData, toUpdate) )
                         {
-                            publishEvent(isLive(oldChildData) ? TreeCacheEvent.Type.NODE_UPDATED : TreeCacheEvent.Type.NODE_ADDED, toPublish);
+                            if ( isLive(oldChildData) )
+                            {
+                                publishEvent(TreeCacheEvent.Type.NODE_UPDATED, toPublish, oldChildData);
+                            }
+                            else
+                            {
+                                publishEvent(TreeCacheEvent.Type.NODE_ADDED, toPublish, null);
+                            }
                             break;
                         }
                     }
@@ -750,6 +758,49 @@ public class TreeCache implements Closeable
         return isLive(result) ? result : null;
     }
 
+    /**
+     * Return an iterator over all nodes in the cache. There are no
+     * guarantees of accuracy; this is merely the most recent view of the data.
+     *
+     * @return a possibly-empty iterator of nodes in the cache
+     */
+    public Iterator<ChildData> iterator()
+    {
+        return new TreeCacheIterator(root);
+    }
+
+    /**
+     * Return the number of nodes in the cache. There are no
+     * guarantees of accuracy; this is merely the most recent view of the data.
+     *
+     * @return size
+     */
+    public int size()
+    {
+        return size(root);
+    }
+
+    private int size(TreeNode node)
+    {
+        int size;
+        if ( isLive(node.childData) )
+        {
+            size = 1;
+            if ( node.children != null )
+            {
+                for ( TreeNode child : node.children.values() )
+                {
+                    size += size(child);
+                }
+            }
+        }
+        else
+        {
+            size = 0;
+        }
+        return size;
+    }
+
     private void callListeners(final TreeCacheEvent event)
     {
         listeners.forEach(listener ->
@@ -837,9 +888,9 @@ public class TreeCache implements Closeable
         publishEvent(new TreeCacheEvent(type, null));
     }
 
-    private void publishEvent(TreeCacheEvent.Type type, ChildData data)
+    private void publishEvent(TreeCacheEvent.Type type, ChildData data, ChildData oldData)
     {
-        publishEvent(new TreeCacheEvent(type, data));
+        publishEvent(new TreeCacheEvent(type, data, oldData));
     }
 
     private void publishEvent(final TreeCacheEvent event)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
index b151037..012b280 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
@@ -26,6 +26,7 @@ public class TreeCacheEvent
 {
     private final Type type;
     private final ChildData data;
+    private final ChildData oldData;
 
     /**
      * Type of change
@@ -113,8 +114,19 @@ public class TreeCacheEvent
      */
     public TreeCacheEvent(Type type, ChildData data)
     {
+        this(type, data, null);
+    }
+
+    /**
+     * @param type event type
+     * @param data event data or null
+     * @param oldData event oldData or null
+     */
+    public TreeCacheEvent(Type type, ChildData data, ChildData oldData)
+    {
         this.type = type;
         this.data = data;
+        this.oldData = oldData;
     }
 
     /**
@@ -133,6 +145,14 @@ public class TreeCacheEvent
         return data;
     }
 
+    /**
+     * @return the node's old data when the type is {@link org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type#NODE_UPDATED}
+     */
+    public ChildData getOldData()
+    {
+        return oldData;
+    }
+
     @Override
     public String toString()
     {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java
new file mode 100644
index 0000000..eed42dc
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+// depth first iterator over tree cache nodes
+class TreeCacheIterator implements Iterator<ChildData>
+{
+    private final LinkedList<Current> stack = new LinkedList<>();
+    private Current current;
+
+    private static class Current
+    {
+        final Iterator<TreeCache.TreeNode> iterator;
+        TreeCache.TreeNode node;
+
+        Current(Iterator<TreeCache.TreeNode> iterator)
+        {
+            this.iterator = iterator;
+            node = iterator.next();
+        }
+    }
+
+    TreeCacheIterator(TreeCache.TreeNode root)
+    {
+        current = new Current(Iterators.forArray(root));
+        stack.push(current);
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        return (current != null) && TreeCache.isLive(current.node.childData);
+    }
+
+    @Override
+    public ChildData next()
+    {
+        if ( current == null )
+        {
+            throw new NoSuchElementException();
+        }
+
+        ChildData result = current.node.childData;  // result of next iteration is current node's data
+
+        // set the next node for the next iteration (or note completion)
+
+        do
+        {
+            setNext();
+        } while ( (current != null) && !TreeCache.isLive(current.node.childData) );
+
+        return result;
+    }
+
+    private void setNext()
+    {
+        if ( current.node.children != null )
+        {
+            stack.push(current);
+            current = new Current(current.node.children.values().iterator());
+        }
+        else while ( true )
+        {
+            if ( current.iterator.hasNext() )
+            {
+                current.node = current.iterator.next();
+                break;
+            }
+            else if ( stack.size() > 0 )
+            {
+                current = stack.pop();
+            }
+            else
+            {
+                current = null; // done
+                break;
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index 246704f..dfeb2ff 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -193,6 +193,16 @@ public class BaseTestTreeCache extends BaseClassForTests
         {
             Assert.assertEquals(event.getData().getData(), expectedData, message);
         }
+
+        if ( event.getType() == TreeCacheEvent.Type.NODE_UPDATED)
+        {
+            Assert.assertNotNull(event.getOldData());
+        }
+        else
+        {
+            Assert.assertNull(event.getOldData());
+        }
+
         return event;
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java
new file mode 100644
index 0000000..2d8dbb3
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestTreeCacheIteratorAndSize extends CuratorTestBase
+{
+    @Test
+    public void testBasic() throws Exception
+    {
+        final String[] nodes = {
+            "/base/test",
+            "/base/test/3",
+            "/base/test/3/0",
+            "/base/test/3/0/0",
+            "/base/test/3/0/1",
+            "/base/test/3/1",
+            "/base/test/3/1/0",
+            "/base/test/3/1/1",
+            "/base/test/3/2",
+            "/base/test/3/2/0",
+            "/base/test/3/2/1",
+            "/base/test/3/2/3",
+            "/base/test/3/3",
+            "/base/test/3/3/1",
+            "/base/test/3/3/3"
+        };
+
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            String basePath = "/base/test";
+            try (TreeCache treeCache = new TreeCache(client, basePath) )
+            {
+                treeCache.start();
+
+                for ( String node : nodes )
+                {
+                    client.create().creatingParentsIfNeeded().forPath(node, node.getBytes());
+                }
+
+                timing.sleepABit(); // let the cache settle
+
+                Iterator<ChildData> iterator = treeCache.iterator();
+                Map<String, byte[]> iteratorValues = new HashMap<>();
+                while ( iterator.hasNext() )
+                {
+                    ChildData next = iterator.next();
+                    iteratorValues.put(next.getPath(), next.getData());
+                }
+
+                Assert.assertEquals(iteratorValues.size(), nodes.length);
+                for ( String node : nodes )
+                {
+                    Assert.assertEquals(iteratorValues.get(node), node.getBytes());
+                }
+
+                Assert.assertEquals(treeCache.size(), nodes.length);
+            }
+        }
+    }
+
+    @Test
+    public void testIteratorWithRandomGraph() throws Exception
+    {
+        Map<String, String> pathAndData = new HashMap<>();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int nodeQty = random.nextInt(100, 200);
+        int maxPerRow = random.nextInt(1, 10);
+        int maxDepth = random.nextInt(3, 5);
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            String basePath = "/base/test";
+            try (TreeCache treeCache = new TreeCache(client, basePath) )
+            {
+                treeCache.start();
+
+                client.create().creatingParentsIfNeeded().forPath(basePath, "0".getBytes());
+                pathAndData.put(basePath, "0");
+
+                while ( nodeQty-- > 0 )
+                {
+                    int thisDepth = random.nextInt(1, maxDepth + 1);
+                    StringBuilder path = new StringBuilder(basePath);
+                    for ( int i = 0; i < thisDepth; ++i )
+                    {
+                        path.append("/").append(random.nextInt(maxPerRow));
+                        long value = random.nextLong();
+                        pathAndData.put(path.toString(), Long.toString(value));
+                        client.create().orSetData().forPath(path.toString(), Long.toString(value).getBytes());
+                    }
+                }
+
+                timing.sleepABit(); // let the cache settle
+
+                Assert.assertEquals(treeCache.size(), pathAndData.size());
+
+                // at this point we have a cached graph of random nodes with random values
+                Iterator<ChildData> iterator = treeCache.iterator();
+                while ( iterator.hasNext() )
+                {
+                    ChildData next = iterator.next();
+                    Assert.assertTrue(pathAndData.containsKey(next.getPath()));
+                    Assert.assertEquals(pathAndData.get(next.getPath()).getBytes(), next.getData());
+                    pathAndData.remove(next.getPath());
+                }
+
+                Assert.assertEquals(pathAndData.size(), 0); // above loop should have removed all nodes
+            }
+        }
+    }
+
+    @Test
+    public void testEmptyTree() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)))
+        {
+            client.start();
+
+            try (TreeCache treeCache = new TreeCache(client, "/base/test"))
+            {
+                treeCache.start();
+
+                Iterator<ChildData> iterator = treeCache.iterator();
+                Assert.assertFalse(iterator.hasNext());
+                Assert.assertEquals(treeCache.size(), 0);
+            }
+        }
+    }
+
+    @Test
+    public void testWithDeletedNodes() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)))
+        {
+            client.start();
+
+            try (TreeCache treeCache = new TreeCache(client, "/foo"))
+            {
+                treeCache.start();
+
+                client.create().forPath("/foo");
+                client.create().forPath("/foo/a1");
+                client.create().forPath("/foo/a2");
+                client.create().forPath("/foo/a2/a2.1");
+                client.create().forPath("/foo/a2/a2.2");
+                client.create().forPath("/foo/a3");
+                client.create().forPath("/foo/a3/a3.1");
+                client.create().forPath("/foo/a3/a3.2");
+
+                client.delete().forPath("/foo/a2/a2.2");
+                client.delete().forPath("/foo/a3/a3.1");
+
+                timing.sleepABit(); // let the cache settle
+
+                Iterator<ChildData> iterator = treeCache.iterator();
+                Set<String> paths = new HashSet<>();
+                while ( iterator.hasNext() )
+                {
+                    ChildData next = iterator.next();
+                    paths.add(next.getPath());
+                }
+
+                Assert.assertEquals(paths, Sets.newHashSet("/foo", "/foo/a1", "/foo/a2", "/foo/a2/a2.1", "/foo/a3", "/foo/a3/a3.2"));
+                Assert.assertEquals(treeCache.size(), 6);
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
index 96ce75c..e78bb9e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
@@ -29,6 +29,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import static org.testng.Assert.assertNotNull;
+
 public class TestTreeCacheRandomTree extends BaseTestTreeCache
 {
     /**
@@ -209,7 +211,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache
     {
         String path = expectedNode.fullPath;
         Map<String, ChildData> cacheChildren = cache.getCurrentChildren(path);
-        Assert.assertNotNull(cacheChildren, path);
+        assertNotNull(cacheChildren, path);
 
         if (withDepth && depth == TEST_DEPTH) {
             return;
@@ -233,7 +235,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache
     private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode)
     {
         String path = expectedNode.fullPath;
-        Assert.assertNotNull(actualChild, path);
+        assertNotNull(actualChild, path);
         Assert.assertEquals(actualChild.getData(), expectedNode.data, path);
     }
 }