You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:10:47 UTC
[1/5] curator git commit: Added asyncEnsureParents()
Repository: curator
Updated Branches:
refs/heads/CURATOR-421 9385d0490 -> 33e413884
Added asyncEnsureParents()
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9d30a8c8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9d30a8c8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9d30a8c8
Branch: refs/heads/CURATOR-421
Commit: 9d30a8c8769f803715862eb9b5479057bfd0d5af
Parents: 9385d04
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:02:30 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:02:30 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 36 ++++++++++++++------
1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9d30a8c8/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index d7b3cc3..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -28,7 +28,6 @@ import org.apache.zookeeper.KeeperException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -134,8 +133,19 @@ public class AsyncWrappers
}
/**
- * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
- * the given executor
+ * Asynchronously ensure that the parents of the given path are created
+ *
+ * @param client client
+ * @param path path to ensure
+ * @return stage
+ */
+ public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+ {
+ return ensure(client, path, ExistsOption.createParentsIfNeeded);
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created as containers
*
* @param client client
* @param path path to ensure
@@ -143,14 +153,7 @@ public class AsyncWrappers
*/
public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
{
- String localPath = ZKPaths.makePath(path, "foo");
- Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
- return client
- .checkExists()
- .withOptions(options)
- .forPath(localPath)
- .thenApply(__ -> null)
- ;
+ return ensure(client, path, ExistsOption.createParentsAsContainers);
}
/**
@@ -373,6 +376,17 @@ public class AsyncWrappers
});
}
+ private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+ {
+ String localPath = ZKPaths.makePath(path, "foo");
+ return client
+ .checkExists()
+ .withOptions(Collections.singleton(option))
+ .forPath(localPath)
+ .thenApply(__ -> null)
+ ;
+ }
+
private AsyncWrappers()
{
}
[4/5] curator git commit: break up the help menu a bit
Posted by ra...@apache.org.
break up the help menu a bit
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c77ef823
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c77ef823
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c77ef823
Branch: refs/heads/CURATOR-421
Commit: c77ef823f91c4ef8713614aec181f681e7a7af8c
Parents: 84191f3
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:28:55 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:28:55 2017 -0500
----------------------------------------------------------------------
src/site/site.xml | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c77ef823/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 4b1ac98..8136c9a 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -70,15 +70,18 @@
<item name="Getting Started" href="getting-started.html"/>
<item name="Examples" href="curator-examples/index.html"/>
<item name="Recipes" href="curator-recipes/index.html"/>
- <item name="Framework" href="curator-framework/index.html"/>
- <item name="Utilities" href="utilities.html"/>
- <item name="Client" href="curator-client/index.html"/>
<item name="Java 8/Async" href="curator-x-async/index.html"/>
<item name="Strongly Typed Models" href="curator-x-async/modeled.html"/>
<item name="Migrations" href="curator-x-async/migrations.html"/>
<item name="Schema Support" href="curator-framework/schema.html"/>
</menu>
+ <menu name="Low Level" inherit="top">
+ <item name="Framework" href="curator-framework/index.html"/>
+ <item name="Utilities" href="utilities.html"/>
+ <item name="Client" href="curator-client/index.html"/>
+ </menu>
+
<menu name="Details" inherit="top">
<item name="Error Handling" href="errors.html"/>
<item name="Logging and Tracing" href="logging.html"/>
[5/5] curator git commit: The entire migration set should be 1
transaction - not each inidividual migration
Posted by ra...@apache.org.
The entire migration set should be 1 transaction - not each inidividual migration
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/33e41388
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/33e41388
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/33e41388
Branch: refs/heads/CURATOR-421
Commit: 33e4138840904635a1793084051fd50b643794f1
Parents: c77ef82
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 18 08:10:43 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 18 08:10:43 2017 -0500
----------------------------------------------------------------------
.../x/async/migrations/MigrationManager.java | 19 +++++++++----------
.../src/site/confluence/migrations.confluence | 4 +++-
.../x/async/migrations/TestMigrationManager.java | 2 +-
3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index 56e7f04..e51f0e4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -106,7 +106,6 @@ public class MigrationManager
}
int compareSize = Math.min(set.migrations().size(), operationHashesInOrder.size());
- List<Migration> subList = set.migrations().subList(0, compareSize);
for ( int i = 0; i < compareSize; ++i )
{
byte[] setHash = hash(set.migrations().get(i).operations());
@@ -184,23 +183,23 @@ public class MigrationManager
}
return asyncEnsureContainers(client, thisMetaDataPath)
- .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
+ .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, thisMetaDataPath));
}
@VisibleForTesting
volatile AtomicInteger debugCount = null;
- private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
+ private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, String thisMetaDataPath)
{
debugCount.incrementAndGet();
+ List<CuratorOp> operations = new ArrayList<>();
String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
- List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
- List<CuratorOp> operations = new ArrayList<>();
- operations.addAll(migration.operations());
- operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(operations)));
- return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
- }).collect(Collectors.toList());
- return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
+ toBeApplied.forEach(migration -> {
+ List<CuratorOp> thisMigrationOperations = migration.operations();
+ operations.addAll(thisMigrationOperations);
+ operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(thisMigrationOperations)));
+ });
+ return client.transaction().forOperations(operations).thenApply(__ -> null);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
index 4775fac..bd2d36f 100644
--- a/curator-x-async/src/site/confluence/migrations.confluence
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -91,7 +91,9 @@ manager.migrate(set).exceptionally(e -> {
});
{code}
-Each migration in the set is applied in a transaction. MigrationManager stores a hash
+Each migration in the set is applied in a single transaction - i.e. all operations that comprise
+a migration set (the sum of all individual migration operations) are sent to ZooKeeper as a single
+transaction. MigrationManager stores a hash
of all operations in a migration so that it can be compared for future operations. i.e.
if, in the future, a migration set is attempted but the hash of one of the previous migrations
does not match, the stage completes exceptionally with {{MigrationException}}.
http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 786e704..47d09ab 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -268,7 +268,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertTrue(Throwables.getRootCause(e) instanceof KeeperException.NoNodeException);
}
- Assert.assertEquals(client.unwrap().getData().forPath("/test"), "something".getBytes());
+ Assert.assertNull(client.unwrap().checkExists().forPath("/test")); // should be all or nothing
}
@Test
[3/5] curator git commit: Merge branch 'master' into CURATOR-421
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/84191f3d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/84191f3d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/84191f3d
Branch: refs/heads/CURATOR-421
Commit: 84191f3d46d015cb49ebebb42fddc1019b3db6c3
Parents: 9d30a8c 123f2ec
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:53 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:53 2017 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[2/5] curator git commit: Added asyncEnsureParents()
Posted by ra...@apache.org.
Added asyncEnsureParents()
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/123f2ece
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/123f2ece
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/123f2ece
Branch: refs/heads/CURATOR-421
Commit: 123f2ece539f924705945f462030ec2b9692aebd
Parents: 1b6216e
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:23 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:23 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 132 +++++++++++++++++--
1 file changed, 120 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/123f2ece/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,13 +18,16 @@
*/
package org.apache.curator.x.async;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
import java.util.Collections;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -70,8 +73,79 @@ import java.util.concurrent.TimeUnit;
public class AsyncWrappers
{
/**
- * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
- * the given executor
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+ {
+ return childrenWithData(client, path, false);
+ }
+
+ /**
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @param isCompressed pass true if data is compressed
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+ {
+ CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+ client.getChildren().forPath(path).handle((children, e) -> {
+ if ( e != null )
+ {
+ if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+ {
+ future.complete(Maps.newHashMap());
+ }
+ else
+ {
+ future.completeExceptionally(e);
+ }
+ }
+ else
+ {
+ completeChildren(client, future, path, children, isCompressed);
+ }
+ return null;
+ });
+ return future;
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created
+ *
+ * @param client client
+ * @param path path to ensure
+ * @return stage
+ */
+ public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+ {
+ return ensure(client, path, ExistsOption.createParentsIfNeeded);
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created as containers
*
* @param client client
* @param path path to ensure
@@ -79,14 +153,7 @@ public class AsyncWrappers
*/
public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
{
- String localPath = ZKPaths.makePath(path, "foo");
- Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
- return client
- .checkExists()
- .withOptions(options)
- .forPath(localPath)
- .thenApply(__ -> null)
- ;
+ return ensure(client, path, ExistsOption.createParentsAsContainers);
}
/**
@@ -279,6 +346,47 @@ public class AsyncWrappers
}
}
+ private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+ {
+ Map<String, byte[]> nodes = Maps.newHashMap();
+ if ( children.size() == 0 )
+ {
+ future.complete(nodes);
+ return;
+ }
+
+ children.forEach(node -> {
+ String path = ZKPaths.makePath(parentPath, node);
+ AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+ stage.handle((data, e) -> {
+ if ( e != null )
+ {
+ future.completeExceptionally(e);
+ }
+ else
+ {
+ nodes.put(path, data);
+ if ( nodes.size() == children.size() )
+ {
+ future.complete(nodes);
+ }
+ }
+ return null;
+ });
+ });
+ }
+
+ private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+ {
+ String localPath = ZKPaths.makePath(path, "foo");
+ return client
+ .checkExists()
+ .withOptions(Collections.singleton(option))
+ .forPath(localPath)
+ .thenApply(__ -> null)
+ ;
+ }
+
private AsyncWrappers()
{
}