You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/02/09 18:36:12 UTC
[13/47] curator git commit: watched version of getConfig
watched version of getConfig
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3aa51d50
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3aa51d50
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3aa51d50
Branch: refs/heads/CURATOR-3.0
Commit: 3aa51d503afe2f5952686976681194418afe171f
Parents: 81f0ab2
Author: randgalt <ra...@apache.org>
Authored: Thu Jan 5 23:17:03 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jan 5 23:17:03 2017 -0500
----------------------------------------------------------------------
.../curator/x/crimps/async/AsyncCrimps.java | 69 ++++++++--------
.../crimps/async/CrimpedBackgroundCallback.java | 6 +-
.../crimps/async/CrimpedConfigEnsembleable.java | 9 +--
.../x/crimps/async/CrimpedEnsembleable.java | 4 +-
.../x/crimps/async/CrimpedEnsembleableImpl.java | 2 +-
.../async/CrimpedWatchedEnsembleable.java | 29 +++++++
.../async/CrimpedWatchedEnsembleableImpl.java | 83 ++++++++++++++++++++
.../curator/x/crimps/async/TestCrimps.java | 9 +--
8 files changed, 155 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
index 639deca..bc3a2c3 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.api.*;
import org.apache.curator.framework.api.transaction.CuratorMultiTransactionMain;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.List;
@@ -98,28 +97,28 @@ public class AsyncCrimps
return build(builder, dataProc);
}
+ public CrimpedPathable<Crimped<byte[]>> dataWatched(Watchable<BackgroundPathable<byte[]>> builder)
+ {
+ return build(builder, dataProc);
+ }
+
public CrimpedPathable<CompletionStage<List<String>>> children(BackgroundPathable<List<String>> builder)
{
return build(builder, childrenProc);
}
- public CrimpedPathable<CompletionStage<Stat>> stat(BackgroundPathable<Stat> builder)
+ public CrimpedPathable<Crimped<List<String>>> childrenWatched(Watchable<BackgroundPathable<List<String>>> builder)
{
- return build(builder, statProc);
+ return build(builder, childrenProc);
}
- public CrimpedPathable<CompletionStage<Stat>> safeStat(BackgroundPathable<Stat> builder)
+ public CrimpedPathable<CompletionStage<Stat>> stat(BackgroundPathable<Stat> builder)
{
return build(builder, safeStatProc);
}
public CrimpedPathable<Crimped<Stat>> statWatched(Watchable<BackgroundPathable<Stat>> builder)
{
- return build(builder, statProc);
- }
-
- public CrimpedPathable<Crimped<Stat>> safeStatWatched(Watchable<BackgroundPathable<Stat>> builder)
- {
return build(builder, safeStatProc);
}
@@ -133,26 +132,23 @@ public class AsyncCrimps
return build(builder, statProc);
}
- public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerEnsembleable<byte[]>> builder)
+ public CrimpedWatchedEnsembleable ensembleWatched(Watchable<BackgroundEnsembleable<byte[]>> builder)
{
- CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc);
-
- Ensembleable<byte[]> main;
- if ( unhandledErrorListener != null )
- {
- main = builder.inBackground(callback).withUnhandledErrorListener(unhandledErrorListener);
- }
- else
- {
- main = builder.inBackground(callback);
- }
+ CrimpedWatcher crimpedWatcher = new CrimpedWatcher();
+ CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, crimpedWatcher);
+ BackgroundEnsembleable<byte[]> localBuilder = builder.usingWatcher(crimpedWatcher);
+ return new CrimpedWatchedEnsembleableImpl(toFinalBuilder(callback, localBuilder), callback);
+ }
- return new CrimpedEnsembleableImpl(main, callback);
+ public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerEnsembleable<byte[]>> builder)
+ {
+ CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null);
+ return new CrimpedEnsembleableImpl(toFinalBuilder(callback, builder), callback);
}
public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> newMembers)
{
- CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc);
+ CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null);
ReconfigBuilderMain main;
if ( unhandledErrorListener != null )
@@ -169,7 +165,7 @@ public class AsyncCrimps
public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> joining, List<String> leaving)
{
- CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc);
+ CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null);
ReconfigBuilderMain main;
if ( unhandledErrorListener != null )
@@ -204,7 +200,7 @@ public class AsyncCrimps
public CrimpedMultiTransaction opResults(Backgroundable<ErrorListenerMultiTransactionMain> builder)
{
- CrimpedBackgroundCallback<List<CuratorTransactionResult>> callback = new CrimpedBackgroundCallback<>(opResultsProc);
+ CrimpedBackgroundCallback<List<CuratorTransactionResult>> callback = new CrimpedBackgroundCallback<>(opResultsProc, null);
ErrorListenerMultiTransactionMain main = builder.inBackground(callback);
CuratorMultiTransactionMain finalBuilder = (unhandledErrorListener != null) ? main.withUnhandledErrorListener(unhandledErrorListener) : main;
return ops -> {
@@ -222,7 +218,7 @@ public class AsyncCrimps
public <T> CrimpledPathAndBytesable<CompletionStage<T>> build(BackgroundPathAndBytesable<T> builder, BackgroundProc<T> backgroundProc)
{
- CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc);
+ CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc, null);
ErrorListenerPathAndBytesable<T> localBuilder = builder.inBackground(callback);
PathAndBytesable<T> finalLocalBuilder = (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder;
return new CrimpledPathAndBytesableImpl<>(finalLocalBuilder, callback, null);
@@ -231,28 +227,27 @@ public class AsyncCrimps
public <T> CrimpedPathable<Crimped<T>> build(Watchable<BackgroundPathable<T>> builder, BackgroundProc<T> backgroundProc)
{
CrimpedWatcher crimpedWatcher = new CrimpedWatcher();
- CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc)
- {
- @Override
- public CompletionStage<WatchedEvent> event()
- {
- return crimpedWatcher;
- }
- };
+ CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<>(backgroundProc, crimpedWatcher);
Pathable<T> finalLocalBuilder = toFinalBuilder(callback, builder.usingWatcher(crimpedWatcher));
return new CrimpledPathAndBytesableImpl<T, Crimped<T>>(finalLocalBuilder, callback, crimpedWatcher);
}
public <T> CrimpedPathable<CompletionStage<T>> build(BackgroundPathable<T> builder, BackgroundProc<T> backgroundProc)
{
- CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc);
+ CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc, null);
Pathable<T> finalLocalBuilder = toFinalBuilder(callback, builder);
return new CrimpledPathAndBytesableImpl<>(finalLocalBuilder, callback, null);
}
- private <T> Pathable<T> toFinalBuilder(CrimpedBackgroundCallback<T> callback, BackgroundPathable<T> backgroundPathable)
+ private Ensembleable<byte[]> toFinalBuilder(CrimpedBackgroundCallback<byte[]> callback, Backgroundable<ErrorListenerEnsembleable<byte[]>> builder)
+ {
+ ErrorListenerEnsembleable<byte[]> localBuilder = builder.inBackground(callback);
+ return (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder;
+ }
+
+ private <T> Pathable<T> toFinalBuilder(CrimpedBackgroundCallback<T> callback, BackgroundPathable<T> builder)
{
- ErrorListenerPathable<T> localBuilder = backgroundPathable.inBackground(callback);
+ ErrorListenerPathable<T> localBuilder = builder.inBackground(callback);
return (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
index b3c20d2..9d9a6fe 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
@@ -28,16 +28,18 @@ import java.util.concurrent.CompletionStage;
class CrimpedBackgroundCallback<T> extends CompletableFuture<T> implements BackgroundCallback, Crimped<T>
{
private final BackgroundProc<T> resultFunction;
+ private final CrimpedWatcher watcher;
- CrimpedBackgroundCallback(BackgroundProc<T> resultFunction)
+ CrimpedBackgroundCallback(BackgroundProc<T> resultFunction, CrimpedWatcher watcher)
{
this.resultFunction = resultFunction;
+ this.watcher = watcher;
}
@Override
public CompletionStage<WatchedEvent> event()
{
- return null;
+ return watcher;
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
index 10c0c35..6d8811f 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
@@ -18,16 +18,13 @@
*/
package org.apache.curator.x.crimps.async;
-import org.apache.curator.framework.api.Ensembleable;
-import java.util.concurrent.CompletionStage;
-
-public interface CrimpedConfigEnsembleable extends
- CrimpledEnsembleable<CompletionStage<byte[]>>
+public interface CrimpedConfigEnsembleable<T> extends
+ CrimpledEnsembleable<T>
{
/**
* Sets the configuration version to use.
* @param config The version of the configuration.
* @return this
*/
- CrimpledEnsembleable<CompletionStage<byte[]>> fromConfig(long config);
+ CrimpledEnsembleable<T> fromConfig(long config);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
index c8166e7..7824fb6 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
@@ -22,8 +22,8 @@ import org.apache.curator.framework.api.Statable;
import java.util.concurrent.CompletionStage;
public interface CrimpedEnsembleable extends
- CrimpedConfigEnsembleable,
- Statable<CrimpedConfigEnsembleable>,
+ CrimpedConfigEnsembleable<CompletionStage<byte[]>>,
+ Statable<CrimpedConfigEnsembleable<CompletionStage<byte[]>>>,
CrimpledEnsembleable<CompletionStage<byte[]>>
{
}
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
index d94d242..b5b7a43 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
@@ -62,7 +62,7 @@ class CrimpedEnsembleableImpl implements CrimpedEnsembleable
}
@Override
- public CrimpedConfigEnsembleable storingStatIn(Stat stat)
+ public CrimpedConfigEnsembleable<CompletionStage<byte[]>> storingStatIn(Stat stat)
{
ensembleable = configureEnsembleable = configBuilder.storingStatIn(stat);
return this;
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java
new file mode 100644
index 0000000..be0cc58
--- /dev/null
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.crimps.async;
+
+import org.apache.curator.framework.api.Statable;
+import java.util.concurrent.CompletionStage;
+
+public interface CrimpedWatchedEnsembleable extends
+ CrimpedConfigEnsembleable<Crimped<byte[]>>,
+ Statable<CrimpedConfigEnsembleable<Crimped<byte[]>>>,
+ CrimpledEnsembleable<Crimped<byte[]>>
+{
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java
new file mode 100644
index 0000000..78c7304
--- /dev/null
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.crimps.async;
+
+import org.apache.curator.framework.api.ConfigureEnsembleable;
+import org.apache.curator.framework.api.Ensembleable;
+import org.apache.curator.framework.api.Statable;
+import org.apache.zookeeper.data.Stat;
+
+class CrimpedWatchedEnsembleableImpl implements CrimpedWatchedEnsembleable
+{
+ private final CrimpedBackgroundCallback<byte[]> callback;
+ private final Statable<ConfigureEnsembleable> configBuilder;
+ private Ensembleable<byte[]> ensembleable;
+ private ConfigureEnsembleable configureEnsembleable;
+
+ CrimpedWatchedEnsembleableImpl(Statable<ConfigureEnsembleable> configBuilder, CrimpedBackgroundCallback<byte[]> callback)
+ {
+ this.configBuilder = configBuilder;
+ this.callback = callback;
+ configureEnsembleable = configBuilder.storingStatIn(new Stat());
+ ensembleable = configureEnsembleable;
+ }
+
+ CrimpedWatchedEnsembleableImpl(Ensembleable<byte[]> ensembleable, CrimpedBackgroundCallback<byte[]> callback)
+ {
+ this.ensembleable = ensembleable;
+ this.configBuilder = null;
+ this.callback = callback;
+ configureEnsembleable = null;
+ }
+
+ @Override
+ public Crimped<byte[]> forEnsemble()
+ {
+ try
+ {
+ ensembleable.forEnsemble();
+ }
+ catch ( Exception e )
+ {
+ callback.completeExceptionally(e);
+ }
+ return callback;
+ }
+
+ @Override
+ public CrimpedConfigEnsembleable<Crimped<byte[]>> storingStatIn(Stat stat)
+ {
+ ensembleable = configureEnsembleable = configBuilder.storingStatIn(stat);
+ return this;
+ }
+
+ @Override
+ public CrimpledEnsembleable<Crimped<byte[]>> fromConfig(long config)
+ {
+ try
+ {
+ ensembleable = configureEnsembleable.fromConfig(config);
+ }
+ catch ( Exception e )
+ {
+ callback.completeExceptionally(e);
+ }
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
index d5b7a6d..62065c3 100644
--- a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
+++ b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
@@ -108,20 +108,13 @@ public class TestCrimps extends BaseClassForTests
{
client.start();
- CompletionStage<Stat> f = async.safeStat(client.checkExists()).forPath("/test");
+ CompletionStage<Stat> f = async.stat(client.checkExists()).forPath("/test");
complete(f.handle((stat, e) -> {
Assert.assertNull(e);
Assert.assertNull(stat);
return null;
}));
- f = async.stat(client.checkExists()).forPath("/test");
- complete(f.handle((stat, e) -> {
- Assert.assertNotNull(e);
- Assert.assertNull(stat);
- return null;
- }));
-
async.path(client.create()).forPath("/test").toCompletableFuture().get();
f = async.stat(client.checkExists()).forPath("/test");
complete(f.handle((stat, e) -> {