You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/02/09 18:36:12 UTC

[13/47] curator git commit: watched version of getConfig

watched version of getConfig


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3aa51d50
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3aa51d50
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3aa51d50

Branch: refs/heads/CURATOR-3.0
Commit: 3aa51d503afe2f5952686976681194418afe171f
Parents: 81f0ab2
Author: randgalt <ra...@apache.org>
Authored: Thu Jan 5 23:17:03 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jan 5 23:17:03 2017 -0500

----------------------------------------------------------------------
 .../curator/x/crimps/async/AsyncCrimps.java     | 69 ++++++++--------
 .../crimps/async/CrimpedBackgroundCallback.java |  6 +-
 .../crimps/async/CrimpedConfigEnsembleable.java |  9 +--
 .../x/crimps/async/CrimpedEnsembleable.java     |  4 +-
 .../x/crimps/async/CrimpedEnsembleableImpl.java |  2 +-
 .../async/CrimpedWatchedEnsembleable.java       | 29 +++++++
 .../async/CrimpedWatchedEnsembleableImpl.java   | 83 ++++++++++++++++++++
 .../curator/x/crimps/async/TestCrimps.java      |  9 +--
 8 files changed, 155 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
index 639deca..bc3a2c3 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorMultiTransactionMain;
 import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
@@ -98,28 +97,28 @@ public class AsyncCrimps
         return build(builder, dataProc);
     }
 
+    public CrimpedPathable<Crimped<byte[]>> dataWatched(Watchable<BackgroundPathable<byte[]>> builder)
+    {
+        return build(builder, dataProc);
+    }
+
     public CrimpedPathable<CompletionStage<List<String>>> children(BackgroundPathable<List<String>> builder)
     {
         return build(builder, childrenProc);
     }
 
-    public CrimpedPathable<CompletionStage<Stat>> stat(BackgroundPathable<Stat> builder)
+    public CrimpedPathable<Crimped<List<String>>> childrenWatched(Watchable<BackgroundPathable<List<String>>> builder)
     {
-        return build(builder, statProc);
+        return build(builder, childrenProc);
     }
 
-    public CrimpedPathable<CompletionStage<Stat>> safeStat(BackgroundPathable<Stat> builder)
+    public CrimpedPathable<CompletionStage<Stat>> stat(BackgroundPathable<Stat> builder)
     {
         return build(builder, safeStatProc);
     }
 
     public CrimpedPathable<Crimped<Stat>> statWatched(Watchable<BackgroundPathable<Stat>> builder)
     {
-        return build(builder, statProc);
-    }
-
-    public CrimpedPathable<Crimped<Stat>> safeStatWatched(Watchable<BackgroundPathable<Stat>> builder)
-    {
         return build(builder, safeStatProc);
     }
 
@@ -133,26 +132,23 @@ public class AsyncCrimps
         return build(builder, statProc);
     }
 
-    public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerEnsembleable<byte[]>> builder)
+    public CrimpedWatchedEnsembleable ensembleWatched(Watchable<BackgroundEnsembleable<byte[]>> builder)
     {
-        CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc);
-
-        Ensembleable<byte[]> main;
-        if ( unhandledErrorListener != null )
-        {
-            main = builder.inBackground(callback).withUnhandledErrorListener(unhandledErrorListener);
-        }
-        else
-        {
-            main = builder.inBackground(callback);
-        }
+        CrimpedWatcher crimpedWatcher = new CrimpedWatcher();
+        CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, crimpedWatcher);
+        BackgroundEnsembleable<byte[]> localBuilder = builder.usingWatcher(crimpedWatcher);
+        return new CrimpedWatchedEnsembleableImpl(toFinalBuilder(callback, localBuilder), callback);
+    }
 
-        return new CrimpedEnsembleableImpl(main, callback);
+    public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerEnsembleable<byte[]>> builder)
+    {
+        CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null);
+        return new CrimpedEnsembleableImpl(toFinalBuilder(callback, builder), callback);
     }
 
     public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> newMembers)
     {
-        CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc);
+        CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null);
 
         ReconfigBuilderMain main;
         if ( unhandledErrorListener != null )
@@ -169,7 +165,7 @@ public class AsyncCrimps
 
     public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> joining, List<String> leaving)
     {
-        CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc);
+        CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null);
 
         ReconfigBuilderMain main;
         if ( unhandledErrorListener != null )
@@ -204,7 +200,7 @@ public class AsyncCrimps
 
     public CrimpedMultiTransaction opResults(Backgroundable<ErrorListenerMultiTransactionMain> builder)
     {
-        CrimpedBackgroundCallback<List<CuratorTransactionResult>> callback = new CrimpedBackgroundCallback<>(opResultsProc);
+        CrimpedBackgroundCallback<List<CuratorTransactionResult>> callback = new CrimpedBackgroundCallback<>(opResultsProc, null);
         ErrorListenerMultiTransactionMain main = builder.inBackground(callback);
         CuratorMultiTransactionMain finalBuilder = (unhandledErrorListener != null) ? main.withUnhandledErrorListener(unhandledErrorListener) : main;
         return ops -> {
@@ -222,7 +218,7 @@ public class AsyncCrimps
 
     public <T> CrimpledPathAndBytesable<CompletionStage<T>> build(BackgroundPathAndBytesable<T> builder, BackgroundProc<T> backgroundProc)
     {
-        CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc);
+        CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc, null);
         ErrorListenerPathAndBytesable<T> localBuilder = builder.inBackground(callback);
         PathAndBytesable<T> finalLocalBuilder = (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder;
         return new CrimpledPathAndBytesableImpl<>(finalLocalBuilder, callback, null);
@@ -231,28 +227,27 @@ public class AsyncCrimps
     public <T> CrimpedPathable<Crimped<T>> build(Watchable<BackgroundPathable<T>> builder, BackgroundProc<T> backgroundProc)
     {
         CrimpedWatcher crimpedWatcher = new CrimpedWatcher();
-        CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc)
-        {
-            @Override
-            public CompletionStage<WatchedEvent> event()
-            {
-                return crimpedWatcher;
-            }
-        };
+        CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<>(backgroundProc, crimpedWatcher);
         Pathable<T> finalLocalBuilder = toFinalBuilder(callback, builder.usingWatcher(crimpedWatcher));
         return new CrimpledPathAndBytesableImpl<T, Crimped<T>>(finalLocalBuilder, callback, crimpedWatcher);
     }
 
     public <T> CrimpedPathable<CompletionStage<T>> build(BackgroundPathable<T> builder, BackgroundProc<T> backgroundProc)
     {
-        CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc);
+        CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc, null);
         Pathable<T> finalLocalBuilder = toFinalBuilder(callback, builder);
         return new CrimpledPathAndBytesableImpl<>(finalLocalBuilder, callback, null);
     }
 
-    private <T> Pathable<T> toFinalBuilder(CrimpedBackgroundCallback<T> callback, BackgroundPathable<T> backgroundPathable)
+    private Ensembleable<byte[]> toFinalBuilder(CrimpedBackgroundCallback<byte[]> callback, Backgroundable<ErrorListenerEnsembleable<byte[]>> builder)
+    {
+        ErrorListenerEnsembleable<byte[]> localBuilder = builder.inBackground(callback);
+        return (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder;
+    }
+
+    private <T> Pathable<T> toFinalBuilder(CrimpedBackgroundCallback<T> callback, BackgroundPathable<T> builder)
     {
-        ErrorListenerPathable<T> localBuilder = backgroundPathable.inBackground(callback);
+        ErrorListenerPathable<T> localBuilder = builder.inBackground(callback);
         return (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder;
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
index b3c20d2..9d9a6fe 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java
@@ -28,16 +28,18 @@ import java.util.concurrent.CompletionStage;
 class CrimpedBackgroundCallback<T> extends CompletableFuture<T> implements BackgroundCallback, Crimped<T>
 {
     private final BackgroundProc<T> resultFunction;
+    private final CrimpedWatcher watcher;
 
-    CrimpedBackgroundCallback(BackgroundProc<T> resultFunction)
+    CrimpedBackgroundCallback(BackgroundProc<T> resultFunction, CrimpedWatcher watcher)
     {
         this.resultFunction = resultFunction;
+        this.watcher = watcher;
     }
 
     @Override
     public CompletionStage<WatchedEvent> event()
     {
-        return null;
+        return watcher;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
index 10c0c35..6d8811f 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java
@@ -18,16 +18,13 @@
  */
 package org.apache.curator.x.crimps.async;
 
-import org.apache.curator.framework.api.Ensembleable;
-import java.util.concurrent.CompletionStage;
-
-public interface CrimpedConfigEnsembleable extends
-    CrimpledEnsembleable<CompletionStage<byte[]>>
+public interface CrimpedConfigEnsembleable<T> extends
+    CrimpledEnsembleable<T>
 {
     /**
      * Sets the configuration version to use.
      * @param config The version of the configuration.
      * @return this
      */
-    CrimpledEnsembleable<CompletionStage<byte[]>> fromConfig(long config);
+    CrimpledEnsembleable<T> fromConfig(long config);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
index c8166e7..7824fb6 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java
@@ -22,8 +22,8 @@ import org.apache.curator.framework.api.Statable;
 import java.util.concurrent.CompletionStage;
 
 public interface CrimpedEnsembleable extends
-    CrimpedConfigEnsembleable,
-    Statable<CrimpedConfigEnsembleable>,
+    CrimpedConfigEnsembleable<CompletionStage<byte[]>>,
+    Statable<CrimpedConfigEnsembleable<CompletionStage<byte[]>>>,
     CrimpledEnsembleable<CompletionStage<byte[]>>
 {
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
index d94d242..b5b7a43 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java
@@ -62,7 +62,7 @@ class CrimpedEnsembleableImpl implements CrimpedEnsembleable
     }
 
     @Override
-    public CrimpedConfigEnsembleable storingStatIn(Stat stat)
+    public CrimpedConfigEnsembleable<CompletionStage<byte[]>> storingStatIn(Stat stat)
     {
         ensembleable = configureEnsembleable = configBuilder.storingStatIn(stat);
         return this;

http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java
new file mode 100644
index 0000000..be0cc58
--- /dev/null
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.crimps.async;
+
+import org.apache.curator.framework.api.Statable;
+import java.util.concurrent.CompletionStage;
+
+public interface CrimpedWatchedEnsembleable extends
+    CrimpedConfigEnsembleable<Crimped<byte[]>>,
+    Statable<CrimpedConfigEnsembleable<Crimped<byte[]>>>,
+    CrimpledEnsembleable<Crimped<byte[]>>
+{
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java
new file mode 100644
index 0000000..78c7304
--- /dev/null
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.crimps.async;
+
+import org.apache.curator.framework.api.ConfigureEnsembleable;
+import org.apache.curator.framework.api.Ensembleable;
+import org.apache.curator.framework.api.Statable;
+import org.apache.zookeeper.data.Stat;
+
+class CrimpedWatchedEnsembleableImpl implements CrimpedWatchedEnsembleable
+{
+    private final CrimpedBackgroundCallback<byte[]> callback;
+    private final Statable<ConfigureEnsembleable> configBuilder;
+    private Ensembleable<byte[]> ensembleable;
+    private ConfigureEnsembleable configureEnsembleable;
+
+    CrimpedWatchedEnsembleableImpl(Statable<ConfigureEnsembleable> configBuilder, CrimpedBackgroundCallback<byte[]> callback)
+    {
+        this.configBuilder = configBuilder;
+        this.callback = callback;
+        configureEnsembleable = configBuilder.storingStatIn(new Stat());
+        ensembleable = configureEnsembleable;
+    }
+
+    CrimpedWatchedEnsembleableImpl(Ensembleable<byte[]> ensembleable, CrimpedBackgroundCallback<byte[]> callback)
+    {
+        this.ensembleable = ensembleable;
+        this.configBuilder = null;
+        this.callback = callback;
+        configureEnsembleable = null;
+    }
+
+    @Override
+    public Crimped<byte[]> forEnsemble()
+    {
+        try
+        {
+            ensembleable.forEnsemble();
+        }
+        catch ( Exception e )
+        {
+            callback.completeExceptionally(e);
+        }
+        return callback;
+    }
+
+    @Override
+    public CrimpedConfigEnsembleable<Crimped<byte[]>> storingStatIn(Stat stat)
+    {
+        ensembleable = configureEnsembleable = configBuilder.storingStatIn(stat);
+        return this;
+    }
+
+    @Override
+    public CrimpledEnsembleable<Crimped<byte[]>> fromConfig(long config)
+    {
+        try
+        {
+            ensembleable = configureEnsembleable.fromConfig(config);
+        }
+        catch ( Exception e )
+        {
+            callback.completeExceptionally(e);
+        }
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
----------------------------------------------------------------------
diff --git a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
index d5b7a6d..62065c3 100644
--- a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
+++ b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java
@@ -108,20 +108,13 @@ public class TestCrimps extends BaseClassForTests
         {
             client.start();
 
-            CompletionStage<Stat> f = async.safeStat(client.checkExists()).forPath("/test");
+            CompletionStage<Stat> f = async.stat(client.checkExists()).forPath("/test");
             complete(f.handle((stat, e) -> {
                 Assert.assertNull(e);
                 Assert.assertNull(stat);
                 return null;
             }));
 
-            f = async.stat(client.checkExists()).forPath("/test");
-            complete(f.handle((stat, e) -> {
-                Assert.assertNotNull(e);
-                Assert.assertNull(stat);
-                return null;
-            }));
-
             async.path(client.create()).forPath("/test").toCompletableFuture().get();
             f = async.stat(client.checkExists()).forPath("/test");
             complete(f.handle((stat, e) -> {