You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/02/09 18:36:16 UTC
[17/47] curator git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/10170c26
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/10170c26
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/10170c26
Branch: refs/heads/CURATOR-3.0
Commit: 10170c2691687cc9f266b6b19ab57b75b088233c
Parents: 9b84ba3
Author: randgalt <ra...@apache.org>
Authored: Fri Jan 6 12:49:17 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jan 6 12:49:17 2017 -0500
----------------------------------------------------------------------
.../framework/imps/CreateBuilderImpl.java | 17 +-
.../framework/imps/GetConfigBuilderImpl.java | 8 +
.../framework/imps/ReconfigBuilderImpl.java | 13 +-
.../imps/RemoveWatchesBuilderImpl.java | 12 ++
.../curator/framework/imps/SyncBuilderImpl.java | 6 +
.../curator/x/async/AsyncCreateBuilder.java | 33 +++
.../curator/x/async/AsyncCuratorFramework.java | 11 +-
.../x/async/AsyncCuratorFrameworkDsl.java | 21 +-
.../curator/x/async/AsyncEnsemblable.java | 29 +++
.../curator/x/async/AsyncGetConfigBuilder.java | 8 +
.../curator/x/async/AsyncReconfigBuilder.java | 51 +++++
.../x/async/AsyncRemoveWatchesBuilder.java | 33 +++
.../curator/x/async/AsyncSyncBuilder.java | 6 +
.../x/async/AsyncTransactionCheckBuilder.java | 15 ++
.../x/async/AsyncTransactionCreateBuilder.java | 17 ++
.../x/async/AsyncTransactionDeleteBuilder.java | 9 +
.../curator/x/async/AsyncTransactionOp.java | 56 +++++
.../x/async/AsyncTransactionSetDataBuilder.java | 13 ++
.../apache/curator/x/async/CreateOption.java | 10 +
.../curator/x/async/RemoveWatcherOption.java | 8 +
.../x/async/WatchedAsyncCuratorFramework.java | 6 +-
.../x/async/details/AsyncCreateBuilderImpl.java | 127 +++++++++++
.../details/AsyncCuratorFrameworkImpl.java | 51 +++--
.../details/AsyncGetConfigBuilderImpl.java | 42 ++++
.../async/details/AsyncReconfigBuilderImpl.java | 108 ++++++++++
.../details/AsyncRemoveWatchesBuilderImpl.java | 157 ++++++++++++++
.../x/async/details/AsyncTransactionOpImpl.java | 215 +++++++++++++++++++
27 files changed, 1032 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 457b4ee..bbb98ea 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -41,7 +41,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
-class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String>
+public class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String>
{
private final CuratorFrameworkImpl client;
private CreateMode createMode;
@@ -76,6 +76,21 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
storingStat = null;
}
+ public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean createParentsAsContainers, boolean doProtected, boolean compress, boolean setDataIfExists, List<ACL> aclList, Stat storingStat)
+ {
+ this.client = client;
+ this.createMode = createMode;
+ this.backgrounding = backgrounding;
+ this.createParentsIfNeeded = createParentsIfNeeded;
+ this.createParentsAsContainers = createParentsAsContainers;
+ this.doProtected = doProtected;
+ this.compress = compress;
+ this.setDataIfExists = setDataIfExists;
+ protectedId = null;
+ this.acling = new ACLing(client.getAclProvider(), aclList);
+ this.storingStat = storingStat;
+ }
+
@Override
public CreateBuilderMain orSetData()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
index 1ab9043..db2d6e4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -45,6 +45,14 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati
watching = new Watching(client);
}
+ public GetConfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Watcher watcher, Stat stat)
+ {
+ this.client = client;
+ this.backgrounding = backgrounding;
+ this.watching = new Watching(client, watcher);
+ this.stat = stat;
+ }
+
@Override
public WatchBackgroundEnsembleable<byte[]> storingStatIn(Stat stat)
{
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index df00785..97be59a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -47,7 +47,18 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
this.client = client;
}
- private byte[] forEnsemble() throws Exception
+ public ReconfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Stat responseStat, long fromConfig, List<String> newMembers, List<String> joining, List<String> leaving)
+ {
+ this.client = client;
+ this.backgrounding = backgrounding;
+ this.responseStat = responseStat;
+ this.fromConfig = fromConfig;
+ this.newMembers = newMembers;
+ this.joining = joining;
+ this.leaving = leaving;
+ }
+
+ public byte[] forEnsemble() throws Exception
{
if ( backgrounding.inBackground() )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index 27a3c0f..e14deff 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -54,6 +54,18 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
this.backgrounding = new Backgrounding();
}
+ public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding)
+ {
+ this.client = client;
+ this.watcher = watcher;
+ this.curatorWatcher = curatorWatcher;
+ this.watcherType = watcherType;
+ this.guaranteed = guaranteed;
+ this.local = local;
+ this.quietly = quietly;
+ this.backgrounding = backgrounding;
+ }
+
void internalRemoval(Watcher watcher, String path) throws Exception
{
this.watcher = watcher;
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
index 542b834..3dec17c 100755
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
@@ -40,6 +40,12 @@ public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String>
this.client = client;
}
+ public SyncBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding)
+ {
+ this.client = client;
+ this.backgrounding = backgrounding;
+ }
+
@Override
public ErrorListenerPathable<Void> inBackground()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java
new file mode 100644
index 0000000..7e28398
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java
@@ -0,0 +1,33 @@
+package org.apache.curator.x.async;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import java.util.List;
+import java.util.Set;
+
+public interface AsyncCreateBuilder extends
+ AsyncPathAndBytesable<AsyncStage<String>>
+{
+ /**
+ * Have the operation fill the provided stat object
+ *
+ * @param stat the stat to have filled in
+ * @return this
+ */
+ AsyncPathable<AsyncStage<String>> storingStatIn(Stat stat);
+
+ AsyncPathable<AsyncStage<String>> withMode(CreateMode createMode);
+
+ AsyncPathable<AsyncStage<String>> withACL(List<ACL> aclList);
+
+ AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options);
+
+ AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, List<ACL> aclList);
+
+ AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList);
+
+ AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode);
+
+ AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
index 2f8cdf1..604be98 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
@@ -10,18 +10,9 @@ import org.apache.curator.framework.api.transaction.TransactionOp;
*/
public interface AsyncCuratorFramework extends AsyncCuratorFrameworkDsl
{
- CuratorFramework getCuratorFramework();
+ CuratorFramework unwrap();
WatchedAsyncCuratorFramework watched();
AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener);
-
- /**
- * Allocate an operation that can be used with {@link #transaction()}.
- * NOTE: {@link CuratorOp} instances created by this builder are
- * reusable.
- *
- * @return operation builder
- */
- TransactionOp transactionOp(); // TODO - versions that don't throw
}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java
index 48d1a58..b7e03d1 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java
@@ -1,13 +1,11 @@
package org.apache.curator.x.async;
-import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.GetACLBuilder;
import org.apache.curator.framework.api.ReconfigBuilder;
import org.apache.curator.framework.api.RemoveWatchesBuilder;
import org.apache.curator.framework.api.SetACLBuilder;
-import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.api.SyncBuilder;
+import org.apache.curator.framework.api.transaction.CuratorOp;
/**
* Zookeeper framework-style client
@@ -19,7 +17,7 @@ public interface AsyncCuratorFrameworkDsl extends WatchedAsyncCuratorFramework
*
* @return builder object
*/
- CreateBuilder create();
+ AsyncCreateBuilder create();
/**
* Start a delete builder
@@ -54,7 +52,7 @@ public interface AsyncCuratorFrameworkDsl extends WatchedAsyncCuratorFramework
*
* @return builder object
*/
- ReconfigBuilder reconfig();
+ AsyncReconfigBuilder reconfig();
/**
* Start a transaction builder
@@ -64,16 +62,25 @@ public interface AsyncCuratorFrameworkDsl extends WatchedAsyncCuratorFramework
AsyncMultiTransaction transaction();
/**
+ * Allocate an operation that can be used with {@link #transaction()}.
+ * NOTE: {@link CuratorOp} instances created by this builder are
+ * reusable.
+ *
+ * @return operation builder
+ */
+ AsyncTransactionOp transactionOp();
+
+ /**
* Start a sync builder. Note: sync is ALWAYS in the background even
* if you don't use one of the background() methods
*
* @return builder object
*/
- SyncBuilder sync();
+ AsyncSyncBuilder sync();
/**
* Start a remove watches builder.
* @return builder object
*/
- RemoveWatchesBuilder watches();
+ AsyncRemoveWatchesBuilder watches();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java
new file mode 100644
index 0000000..55aa918
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+public interface AsyncEnsemblable<T>
+{
+ /**
+ * Commit the currently building operation
+ *
+ * @return operation result if any
+ */
+ T forEnsemble();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java
new file mode 100644
index 0000000..19e5b6a
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java
@@ -0,0 +1,8 @@
+package org.apache.curator.x.async;
+
+import org.apache.zookeeper.data.Stat;
+
+public interface AsyncGetConfigBuilder extends AsyncEnsemblable<AsyncStage<byte[]>>
+{
+ AsyncEnsemblable<AsyncStage<byte[]>> storingStatIn(Stat stat);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java
new file mode 100644
index 0000000..75b898e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java
@@ -0,0 +1,51 @@
+package org.apache.curator.x.async;
+
+import org.apache.zookeeper.data.Stat;
+import java.util.List;
+
+public interface AsyncReconfigBuilder
+{
+ /**
+ * Sets one or more members that are meant to be the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param servers The servers joining.
+ * @return this
+ */
+ AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers);
+
+ AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving);
+
+ /**
+ * Sets one or more members that are meant to be the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param servers The servers joining.
+ * @return this
+ */
+ AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, long fromConfig);
+
+ AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, long fromConfig);
+
+ /**
+ * Sets one or more members that are meant to be the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param servers The servers joining.
+ * @return this
+ */
+ AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat);
+
+ AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat);
+
+ /**
+ * Sets one or more members that are meant to be the ensemble.
+ * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port]
+ *
+ * @param servers The servers joining.
+ * @return this
+ */
+ AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat, long fromConfig);
+
+ AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat, long fromConfig);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java
new file mode 100644
index 0000000..fe8efa1
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java
@@ -0,0 +1,33 @@
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.zookeeper.Watcher;
+import java.util.Set;
+
+public interface AsyncRemoveWatchesBuilder extends
+ AsyncPathable<AsyncStage<Void>>
+{
+ AsyncPathable<AsyncStage<Void>> removing(Watcher watcher);
+
+ AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher);
+
+ AsyncPathable<AsyncStage<Void>> removingAll();
+
+ AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Set<RemoveWatcherOption> options);
+
+ AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Set<RemoveWatcherOption> options);
+
+ AsyncPathable<AsyncStage<Void>> removingAll(Set<RemoveWatcherOption> options);
+
+ AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options);
+
+ AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options);
+
+ AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options);
+
+ AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType);
+
+ AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType);
+
+ AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java
new file mode 100644
index 0000000..0289aa7
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java
@@ -0,0 +1,6 @@
+package org.apache.curator.x.async;
+
+public interface AsyncSyncBuilder extends
+ AsyncPathable<AsyncStage<Void>>
+{
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java
new file mode 100644
index 0000000..032029d
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java
@@ -0,0 +1,15 @@
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+
+public interface AsyncTransactionCheckBuilder extends
+ AsyncPathable<CuratorOp>
+{
+ /**
+ * Use the given version (the default is -1)
+ *
+ * @param version version to use
+ * @return this
+ */
+ AsyncPathable<CuratorOp> withVersion(int version);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java
new file mode 100644
index 0000000..a94a011
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java
@@ -0,0 +1,17 @@
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import java.util.List;
+
+public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable<CuratorOp>
+{
+ AsyncPathable<CuratorOp> withMode(CreateMode createMode);
+
+ AsyncPathable<CuratorOp> withACL(List<ACL> aclList);
+
+ AsyncPathable<CuratorOp> compressed();
+
+ AsyncPathable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java
new file mode 100644
index 0000000..984999e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java
@@ -0,0 +1,9 @@
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+
+public interface AsyncTransactionDeleteBuilder extends
+ AsyncPathable<CuratorOp>
+{
+ AsyncPathable<CuratorOp> withVersion(int version);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java
new file mode 100644
index 0000000..2e2f886
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Builds operations that can be committed as a transaction
+ * via {@link CuratorFramework#transaction()}
+ */
+public interface AsyncTransactionOp
+{
+ /**
+ * Start a create builder in the transaction
+ *
+ * @return builder object
+ */
+ AsyncTransactionCreateBuilder create();
+
+ /**
+ * Start a delete builder in the transaction
+ *
+ * @return builder object
+ */
+ AsyncTransactionDeleteBuilder delete();
+
+ /**
+ * Start a setData builder in the transaction
+ *
+ * @return builder object
+ */
+ AsyncTransactionSetDataBuilder setData();
+
+ /**
+ * Start a check builder in the transaction
+ *
+ * @return builder object
+ */
+ AsyncTransactionCheckBuilder check();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java
new file mode 100644
index 0000000..dfb1b9a
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java
@@ -0,0 +1,13 @@
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+
+public interface AsyncTransactionSetDataBuilder extends
+ AsyncPathAndBytesable<CuratorOp>
+{
+ AsyncPathAndBytesable<CuratorOp> withVersion(int version);
+
+ AsyncPathAndBytesable<CuratorOp> compressed();
+
+ AsyncPathAndBytesable<CuratorOp> withVersionCompressed(int version);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java b/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java
new file mode 100644
index 0000000..5c7b741
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java
@@ -0,0 +1,10 @@
+package org.apache.curator.x.async;
+
+public enum CreateOption
+{
+ createParentsIfNeeded,
+ createParentsAsContainers,
+ doProtected,
+ compress,
+ setDataIfExists
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java b/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java
new file mode 100644
index 0000000..e4e8688
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java
@@ -0,0 +1,8 @@
+package org.apache.curator.x.async;
+
+public enum RemoveWatcherOption
+{
+ guaranteed,
+ local,
+ quietly
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java
index b40c623..62f03ed 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java
@@ -1,9 +1,5 @@
package org.apache.curator.x.async;
-import org.apache.curator.framework.api.ExistsBuilder;
-import org.apache.curator.framework.api.GetConfigBuilder;
-import org.apache.curator.framework.api.GetDataBuilder;
-
/**
* Zookeeper framework-style client
*/
@@ -38,5 +34,5 @@ public interface WatchedAsyncCuratorFramework
*
* @return builder object
*/
- GetConfigBuilder getConfig();
+ AsyncGetConfigBuilder getConfig();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
new file mode 100644
index 0000000..921cc0d
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -0,0 +1,127 @@
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.CreateBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.x.async.AsyncCreateBuilder;
+import org.apache.curator.x.async.AsyncPathable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.CreateOption;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.pathProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncCreateBuilderImpl implements AsyncCreateBuilder
+{
+ private final CuratorFrameworkImpl client;
+ private final UnhandledErrorListener unhandledErrorListener;
+ private CreateMode createMode = CreateMode.PERSISTENT;
+ private List<ACL> aclList = null;
+ private Set<CreateOption> options = Collections.emptySet();
+ private Stat stat = null;
+
+ AsyncCreateBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ {
+ this.client = client;
+ this.unhandledErrorListener = unhandledErrorListener;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> storingStatIn(Stat stat)
+ {
+ this.stat = stat;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> withMode(CreateMode createMode)
+ {
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> withACL(List<ACL> aclList)
+ {
+ this.aclList = aclList;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, List<ACL> aclList)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.aclList = aclList;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.aclList = aclList;
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.aclList = aclList;
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ this.stat = stat;
+ return this;
+ }
+
+ @Override
+ public AsyncStage<String> forPath(String path)
+ {
+ return internalForPath(path, null, false);
+ }
+
+ @Override
+ public AsyncStage<String> forPath(String path, byte[] data)
+ {
+ return internalForPath(path, data, true);
+ }
+
+ private AsyncStage<String> internalForPath(String path, byte[] data, boolean useData)
+ {
+ BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, false, pathProc);
+ CreateBuilderImpl builder = new CreateBuilderImpl(client,
+ createMode,
+ common.backgrounding,
+ options.contains(CreateOption.createParentsIfNeeded),
+ options.contains(CreateOption.createParentsAsContainers),
+ options.contains(CreateOption.doProtected),
+ options.contains(CreateOption.compress),
+ options.contains(CreateOption.setDataIfExists),
+ aclList,
+ stat
+ );
+ return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path));
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 193a428..2bb922b 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -1,24 +1,19 @@
package org.apache.curator.x.async.details;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.GetACLBuilder;
+import org.apache.curator.framework.api.ReconfigBuilder;
+import org.apache.curator.framework.api.RemoveWatchesBuilder;
+import org.apache.curator.framework.api.SetACLBuilder;
+import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
-import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.CuratorMultiTransactionImpl;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.AsyncCuratorFrameworkDsl;
-import org.apache.curator.x.async.AsyncDeleteBuilder;
-import org.apache.curator.x.async.AsyncExistsBuilder;
-import org.apache.curator.x.async.AsyncGetChildrenBuilder;
-import org.apache.curator.x.async.AsyncGetDataBuilder;
-import org.apache.curator.x.async.AsyncMultiTransaction;
-import org.apache.curator.x.async.AsyncSetDataBuilder;
-import org.apache.curator.x.async.WatchedAsyncCuratorFramework;
+import org.apache.curator.framework.imps.SyncBuilderImpl;
+import org.apache.curator.x.async.*;
import java.util.List;
-import static org.apache.curator.x.async.details.BackgroundProcs.opResultsProc;
-import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+import static org.apache.curator.x.async.details.BackgroundProcs.*;
public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
{
@@ -34,9 +29,9 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
@Override
- public CreateBuilder create()
+ public AsyncCreateBuilder create()
{
- return null;
+ return new AsyncCreateBuilderImpl(client, unhandledErrorListener);
}
@Override
@@ -64,9 +59,9 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
@Override
- public ReconfigBuilder reconfig()
+ public AsyncReconfigBuilder reconfig()
{
- return null;
+ return new AsyncReconfigBuilderImpl(client, unhandledErrorListener);
}
@Override
@@ -80,19 +75,23 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
@Override
- public SyncBuilder sync()
+ public AsyncSyncBuilder sync()
{
- return null;
+ return path -> {
+ BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+ SyncBuilderImpl builder = new SyncBuilderImpl(client, common.backgrounding);
+ return safeCall(common.internalCallback, () -> builder.forPath(path));
+ };
}
@Override
- public RemoveWatchesBuilder watches()
+ public AsyncRemoveWatchesBuilder watches()
{
- return null;
+ return new AsyncRemoveWatchesBuilderImpl(client, unhandledErrorListener);
}
@Override
- public CuratorFramework getCuratorFramework()
+ public CuratorFramework unwrap()
{
return client;
}
@@ -110,9 +109,9 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
@Override
- public TransactionOp transactionOp()
+ public AsyncTransactionOp transactionOp()
{
- return client.transactionOp();
+ return new AsyncTransactionOpImpl(client);
}
@Override
@@ -134,8 +133,8 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
@Override
- public GetConfigBuilder getConfig()
+ public AsyncGetConfigBuilder getConfig()
{
- return null;
+ return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watched);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
new file mode 100644
index 0000000..5673e29
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
@@ -0,0 +1,42 @@
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.GetConfigBuilderImpl;
+import org.apache.curator.x.async.AsyncEnsemblable;
+import org.apache.curator.x.async.AsyncGetConfigBuilder;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.data.Stat;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.dataProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder
+{
+ private final CuratorFrameworkImpl client;
+ private final UnhandledErrorListener unhandledErrorListener;
+ private final boolean watched;
+ private Stat stat = null;
+
+ AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched)
+ {
+ this.client = client;
+ this.unhandledErrorListener = unhandledErrorListener;
+ this.watched = watched;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<byte[]>> storingStatIn(Stat stat)
+ {
+ this.stat = stat;
+ return this;
+ }
+
+ @Override
+ public AsyncStage<byte[]> forEnsemble()
+ {
+ BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watched, dataProc);
+ GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, common.backgrounding, common.watcher, stat);
+ return safeCall(common.internalCallback, builder::forEnsemble);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
new file mode 100644
index 0000000..ce0c21e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
@@ -0,0 +1,108 @@
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.ReconfigBuilderImpl;
+import org.apache.curator.x.async.AsyncEnsemblable;
+import org.apache.curator.x.async.AsyncReconfigBuilder;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.data.Stat;
+import java.util.List;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable<AsyncStage<Void>>
+{
+ private final CuratorFrameworkImpl client;
+ private final UnhandledErrorListener unhandledErrorListener;
+ private Stat stat = null;
+ private long fromConfig = -1;
+ private List<String> newMembers = null;
+ private List<String> joining = null;
+ private List<String> leaving = null;
+
+ AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ {
+ this.client = client;
+ this.unhandledErrorListener = unhandledErrorListener;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers)
+ {
+ this.newMembers = servers;
+ return this;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving)
+ {
+ this.joining = joining;
+ this.leaving = leaving;
+ return this;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat)
+ {
+ this.newMembers = servers;
+ this.stat = stat;
+ return this;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat)
+ {
+ this.joining = joining;
+ this.leaving = leaving;
+ return this;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat, long fromConfig)
+ {
+ this.newMembers = servers;
+ this.stat = stat;
+ this.fromConfig = fromConfig;
+ return this;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat, long fromConfig)
+ {
+ this.joining = joining;
+ this.leaving = leaving;
+ this.stat = stat;
+ this.fromConfig = fromConfig;
+ return this;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, long fromConfig)
+ {
+ this.newMembers = servers;
+ this.fromConfig = fromConfig;
+ return this;
+ }
+
+ @Override
+ public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, long fromConfig)
+ {
+ this.joining = joining;
+ this.leaving = leaving;
+ this.fromConfig = fromConfig;
+ return this;
+ }
+
+ @Override
+ public AsyncStage<Void> forEnsemble()
+ {
+ BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+ ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, common.backgrounding, stat, fromConfig, newMembers, joining, leaving);
+ return safeCall(common.internalCallback, () -> {
+ builder.forEnsemble();
+ return null;
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
new file mode 100644
index 0000000..553cd68
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
@@ -0,0 +1,157 @@
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.RemoveWatchesBuilderImpl;
+import org.apache.curator.x.async.AsyncPathable;
+import org.apache.curator.x.async.AsyncRemoveWatchesBuilder;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.RemoveWatcherOption;
+import org.apache.zookeeper.Watcher;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder
+{
+ private final CuratorFrameworkImpl client;
+ private final UnhandledErrorListener unhandledErrorListener;
+ private Watcher.WatcherType watcherType = Watcher.WatcherType.Any;
+ private Set<RemoveWatcherOption> options = Collections.emptySet();
+ private Watcher watcher = null;
+ private CuratorWatcher curatorWatcher = null;
+
+ AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ {
+ this.client = client;
+ this.unhandledErrorListener = unhandledErrorListener;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher)
+ {
+ this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.curatorWatcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher)
+ {
+ this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removingAll()
+ {
+ this.curatorWatcher = null;
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Set<RemoveWatcherOption> options)
+ {
+ this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.curatorWatcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Set<RemoveWatcherOption> options)
+ {
+ this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removingAll(Set<RemoveWatcherOption> options)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.curatorWatcher = null;
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options)
+ {
+ this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null");
+ this.curatorWatcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options)
+ {
+ this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null");
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options)
+ {
+ this.options = Objects.requireNonNull(options, "options cannot be null");
+ this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null");
+ this.curatorWatcher = null;
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType)
+ {
+ this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null");
+ this.curatorWatcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType)
+ {
+ this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null");
+ this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null");
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType)
+ {
+ this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null");
+ this.curatorWatcher = null;
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public AsyncStage<Void> forPath(String path)
+ {
+ BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+ RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client,
+ watcher,
+ curatorWatcher,
+ watcherType,
+ options.contains(RemoveWatcherOption.guaranteed),
+ options.contains(RemoveWatcherOption.local),
+ options.contains(RemoveWatcherOption.guaranteed),
+ common.backgrounding
+ );
+ return safeCall(common.internalCallback, () -> builder.forPath(path));
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
new file mode 100644
index 0000000..02506c1
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
@@ -0,0 +1,215 @@
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.ACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.api.VersionPathAndBytesable;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
+import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.x.async.AsyncPathAndBytesable;
+import org.apache.curator.x.async.AsyncPathable;
+import org.apache.curator.x.async.AsyncTransactionCheckBuilder;
+import org.apache.curator.x.async.AsyncTransactionCreateBuilder;
+import org.apache.curator.x.async.AsyncTransactionDeleteBuilder;
+import org.apache.curator.x.async.AsyncTransactionOp;
+import org.apache.curator.x.async.AsyncTransactionSetDataBuilder;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import java.util.List;
+import java.util.Objects;
+
+class AsyncTransactionOpImpl implements AsyncTransactionOp
+{
+ private final CuratorFrameworkImpl client;
+
+ AsyncTransactionOpImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public AsyncTransactionCreateBuilder create()
+ {
+ return new AsyncTransactionCreateBuilder()
+ {
+ private List<ACL> aclList = null;
+ private CreateMode createMode = CreateMode.PERSISTENT;
+ private boolean compressed = false;
+
+ @Override
+ public AsyncPathable<CuratorOp> withMode(CreateMode createMode)
+ {
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<CuratorOp> withACL(List<ACL> aclList)
+ {
+ this.aclList = aclList;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<CuratorOp> compressed()
+ {
+ compressed = true;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed)
+ {
+ this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null");
+ this.aclList = aclList;
+ this.compressed = compressed;
+ return this;
+ }
+
+ @Override
+ public CuratorOp forPath(String path, byte[] data)
+ {
+ return internalForPath(path, data, true);
+ }
+
+ @Override
+ public CuratorOp forPath(String path)
+ {
+ return internalForPath(path, null, false);
+ }
+
+ private CuratorOp internalForPath(String path, byte[] data, boolean useData)
+ {
+ TransactionCreateBuilder<CuratorOp> builder1 = client.transactionOp().create();
+ ACLCreateModePathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed() : builder1;
+ PathAndBytesable<CuratorOp> builder3 = builder2.withACL(aclList);
+ try
+ {
+ return useData ? builder3.forPath(path, data) : builder3.forPath(path);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e); // should never happen
+ }
+ }
+ };
+ }
+
+ @Override
+ public AsyncTransactionDeleteBuilder delete()
+ {
+ return new AsyncTransactionDeleteBuilder()
+ {
+ private int version = -1;
+
+ @Override
+ public AsyncPathable<CuratorOp> withVersion(int version)
+ {
+ this.version = version;
+ return this;
+ }
+
+ @Override
+ public CuratorOp forPath(String path)
+ {
+ try
+ {
+ return client.transactionOp().delete().withVersion(version).forPath(path);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e); // should never happen
+ }
+ }
+ };
+ }
+
+ @Override
+ public AsyncTransactionSetDataBuilder setData()
+ {
+ return new AsyncTransactionSetDataBuilder()
+ {
+ private int version = -1;
+ private boolean compressed = false;
+
+ @Override
+ public AsyncPathAndBytesable<CuratorOp> withVersion(int version)
+ {
+ this.version = version;
+ return this;
+ }
+
+ @Override
+ public AsyncPathAndBytesable<CuratorOp> compressed()
+ {
+ compressed = true;
+ return this;
+ }
+
+ @Override
+ public AsyncPathAndBytesable<CuratorOp> withVersionCompressed(int version)
+ {
+ this.version = version;
+ compressed = true;
+ return this;
+ }
+
+ @Override
+ public CuratorOp forPath(String path, byte[] data)
+ {
+ return internalForPath(path, data, true);
+ }
+
+ @Override
+ public CuratorOp forPath(String path)
+ {
+ return internalForPath(path, null, false);
+ }
+
+ private CuratorOp internalForPath(String path, byte[] data, boolean useData)
+ {
+ TransactionSetDataBuilder<CuratorOp> builder1 = client.transactionOp().setData();
+ VersionPathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed() : builder1;
+ PathAndBytesable<CuratorOp> builder3 = builder2.withVersion(version);
+ try
+ {
+ return useData ? builder3.forPath(path, data) : builder3.forPath(path);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e); // should never happen
+ }
+ }
+ };
+ }
+
+ @Override
+ public AsyncTransactionCheckBuilder check()
+ {
+ return new AsyncTransactionCheckBuilder()
+ {
+ private int version = -1;
+
+ @Override
+ public AsyncPathable<CuratorOp> withVersion(int version)
+ {
+ this.version = version;
+ return this;
+ }
+
+ @Override
+ public CuratorOp forPath(String path)
+ {
+ try
+ {
+ return client.transactionOp().check().withVersion(version).forPath(path);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e); // should never happen
+ }
+ }
+ };
+ }
+}