You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/16 21:06:33 UTC
[16/21] curator git commit: Added asyncEnsureParents()
Added asyncEnsureParents()
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/123f2ece
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/123f2ece
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/123f2ece
Branch: refs/heads/CURATOR-419
Commit: 123f2ece539f924705945f462030ec2b9692aebd
Parents: 1b6216e
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:23 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:23 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 132 +++++++++++++++++--
1 file changed, 120 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/123f2ece/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,13 +18,16 @@
*/
package org.apache.curator.x.async;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
import java.util.Collections;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -70,8 +73,79 @@ import java.util.concurrent.TimeUnit;
public class AsyncWrappers
{
/**
- * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
- * the given executor
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+ {
+ return childrenWithData(client, path, false);
+ }
+
+ /**
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @param isCompressed pass true if data is compressed
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+ {
+ CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+ client.getChildren().forPath(path).handle((children, e) -> {
+ if ( e != null )
+ {
+ if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+ {
+ future.complete(Maps.newHashMap());
+ }
+ else
+ {
+ future.completeExceptionally(e);
+ }
+ }
+ else
+ {
+ completeChildren(client, future, path, children, isCompressed);
+ }
+ return null;
+ });
+ return future;
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created
+ *
+ * @param client client
+ * @param path path to ensure
+ * @return stage
+ */
+ public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+ {
+ return ensure(client, path, ExistsOption.createParentsIfNeeded);
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created as containers
*
* @param client client
* @param path path to ensure
@@ -79,14 +153,7 @@ public class AsyncWrappers
*/
public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
{
- String localPath = ZKPaths.makePath(path, "foo");
- Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
- return client
- .checkExists()
- .withOptions(options)
- .forPath(localPath)
- .thenApply(__ -> null)
- ;
+ return ensure(client, path, ExistsOption.createParentsAsContainers);
}
/**
@@ -279,6 +346,47 @@ public class AsyncWrappers
}
}
+ private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+ {
+ Map<String, byte[]> nodes = Maps.newHashMap();
+ if ( children.size() == 0 )
+ {
+ future.complete(nodes);
+ return;
+ }
+
+ children.forEach(node -> {
+ String path = ZKPaths.makePath(parentPath, node);
+ AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+ stage.handle((data, e) -> {
+ if ( e != null )
+ {
+ future.completeExceptionally(e);
+ }
+ else
+ {
+ nodes.put(path, data);
+ if ( nodes.size() == children.size() )
+ {
+ future.complete(nodes);
+ }
+ }
+ return null;
+ });
+ });
+ }
+
+ private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+ {
+ String localPath = ZKPaths.makePath(path, "foo");
+ return client
+ .checkExists()
+ .withOptions(Collections.singleton(option))
+ .forPath(localPath)
+ .thenApply(__ -> null)
+ ;
+ }
+
private AsyncWrappers()
{
}