You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:10:48 UTC

[2/5] curator git commit: Added asyncEnsureParents()

Added asyncEnsureParents()


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/123f2ece
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/123f2ece
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/123f2ece

Branch: refs/heads/CURATOR-421
Commit: 123f2ece539f924705945f462030ec2b9692aebd
Parents: 1b6216e
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:23 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:23 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   | 132 +++++++++++++++++--
 1 file changed, 120 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/123f2ece/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,13 +18,16 @@
  */
 package org.apache.curator.x.async;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
 import java.util.Collections;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
@@ -70,8 +73,79 @@ import java.util.concurrent.TimeUnit;
 public class AsyncWrappers
 {
     /**
-     * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
-     * the given executor
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+    {
+        return childrenWithData(client, path, false);
+    }
+
+    /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @param isCompressed pass true if data is compressed
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+    {
+        CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+        client.getChildren().forPath(path).handle((children, e) -> {
+            if ( e != null )
+            {
+                if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+                {
+                    future.complete(Maps.newHashMap());
+                }
+                else
+                {
+                    future.completeExceptionally(e);
+                }
+            }
+            else
+            {
+                completeChildren(client, future, path, children, isCompressed);
+            }
+            return null;
+        });
+        return future;
+    }
+
+    /**
+     * Asynchronously ensure that the parents of the given path are created
+     *
+     * @param client client
+     * @param path path to ensure
+     * @return stage
+     */
+    public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+    {
+        return ensure(client, path, ExistsOption.createParentsIfNeeded);
+    }
+
+    /**
+     * Asynchronously ensure that the parents of the given path are created as containers
      *
      * @param client client
      * @param path path to ensure
@@ -79,14 +153,7 @@ public class AsyncWrappers
      */
     public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
     {
-        String localPath = ZKPaths.makePath(path, "foo");
-        Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
-        return client
-            .checkExists()
-            .withOptions(options)
-            .forPath(localPath)
-            .thenApply(__ -> null)
-            ;
+        return ensure(client, path, ExistsOption.createParentsAsContainers);
     }
 
     /**
@@ -279,6 +346,47 @@ public class AsyncWrappers
         }
     }
 
+    private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+    {
+        Map<String, byte[]> nodes = Maps.newHashMap();
+        if ( children.size() == 0 )
+        {
+            future.complete(nodes);
+            return;
+        }
+
+        children.forEach(node -> {
+            String path = ZKPaths.makePath(parentPath, node);
+            AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+            stage.handle((data, e) -> {
+                if ( e != null )
+                {
+                    future.completeExceptionally(e);
+                }
+                else
+                {
+                    nodes.put(path, data);
+                    if ( nodes.size() == children.size() )
+                    {
+                        future.complete(nodes);
+                    }
+                }
+                return null;
+            });
+        });
+    }
+
+    private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+    {
+        String localPath = ZKPaths.makePath(path, "foo");
+        return client
+            .checkExists()
+            .withOptions(Collections.singleton(option))
+            .forPath(localPath)
+            .thenApply(__ -> null)
+            ;
+    }
+
     private AsyncWrappers()
     {
     }