You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/16 21:06:18 UTC

[01/21] curator git commit: CURATOR-311 - SharedValue could hold stall data after reconnecting

Repository: curator
Updated Branches:
  refs/heads/CURATOR-419 4ad1fb8e5 -> 7d4f06238


CURATOR-311 - SharedValue could hold stall data after reconnecting


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8c1c5ffa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8c1c5ffa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8c1c5ffa

Branch: refs/heads/CURATOR-419
Commit: 8c1c5ffa287d22eaea18bf6f89a4a8bf6d9b871c
Parents: 35d2cc0
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Wed Jan 11 20:30:46 2017 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Fri Feb 24 16:34:51 2017 +0900

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |   5 +
 .../framework/recipes/shared/SharedValue.java   |  27 ++++-
 .../recipes/shared/TestSharedCount.java         | 116 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..bdfa844 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -49,6 +49,11 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
         sharedValue = new SharedValue(client, path, toBytes(seedValue));
     }
 
+    protected SharedCount(CuratorFramework client, String path, SharedValue sv)
+    {
+        sharedValue = sv;
+    }
+
     @Override
     public int getCount()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1f9df37..7e3f26a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.shared;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
@@ -56,8 +57,9 @@ public class SharedValue implements Closeable, SharedValueReader
     private final byte[] seedValue;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicReference<VersionedValue<byte[]>> currentValue;
+    private final CuratorWatcher watcher;
 
-    private final CuratorWatcher watcher = new CuratorWatcher()
+    private class SharedValueCuratorWatcher implements CuratorWatcher
     {
         @Override
         public void process(WatchedEvent event) throws Exception
@@ -76,6 +78,17 @@ public class SharedValue implements Closeable, SharedValueReader
         public void stateChanged(CuratorFramework client, ConnectionState newState)
         {
             notifyListenerOfStateChanged(newState);
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                try
+                {
+                    readValueAndNotifyListenersInBackground();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not read value after reconnect", e);
+                }
+            }
         }
     };
 
@@ -96,6 +109,18 @@ public class SharedValue implements Closeable, SharedValueReader
         this.client = client;
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        this.watcher = new SharedValueCuratorWatcher();
+        currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+    }
+
+    @VisibleForTesting
+    protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+    {
+        this.client = client;
+        this.path = PathUtils.validatePath(path);
+        this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        // inject watcher for testing
+        this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 7939f6e..330c8f4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -368,6 +371,7 @@ public class TestSharedCount extends BaseClassForTests
 
             server.restart();
             Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+            Assert.assertEquals(numChangeEvents.get(), 1);
 
             sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
 
@@ -381,7 +385,9 @@ public class TestSharedCount extends BaseClassForTests
             }).forPath("/count");
             flushDone.await(5, TimeUnit.SECONDS);
 
-            Assert.assertEquals(2, numChangeEvents.get());
+            // CURATOR-311: when a Curator client's state became RECONNECTED, countHasChanged method is called back
+            // because the Curator client calls readValueAndNotifyListenersInBackground in SharedValue#ConnectionStateListener#stateChanged.
+            Assert.assertEquals(numChangeEvents.get(), 3);
         }
         finally
         {
@@ -389,4 +395,112 @@ public class TestSharedCount extends BaseClassForTests
             CloseableUtils.closeQuietly(curatorFramework);
         }
     }
+
+
+    @Test
+    public void testDisconnectReconnectWithMultipleClients() throws Exception
+    {
+        CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+        CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+
+        curatorFramework1.start();
+        curatorFramework1.blockUntilConnected();
+        curatorFramework2.start();
+        curatorFramework2.blockUntilConnected();
+
+        final String sharedCountPath = "/count";
+        final int initialCount = 10;
+        SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+        SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+
+        class MySharedCountListener implements SharedCountListener
+        {
+            final public Phaser gotSuspendEvent = new Phaser(1);
+            final public Phaser gotChangeEvent = new Phaser(1);
+            final public Phaser getReconnectEvent = new Phaser(1);
+            final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+            @Override
+            public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+            {
+                numChangeEvents.incrementAndGet();
+                gotChangeEvent.arrive();
+            }
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if (newState == ConnectionState.SUSPENDED) {
+                    gotSuspendEvent.arrive();
+                } else if (newState == ConnectionState.RECONNECTED) {
+                    getReconnectEvent.arrive();
+                }
+            }
+        }
+
+        MySharedCountListener listener1 = new MySharedCountListener();
+        sharedCount1.addListener(listener1);
+        sharedCount1.start();
+        MySharedCountListener listener2 = new MySharedCountListener();
+        sharedCountWithFaultyWatcher.addListener(listener2);
+
+        try
+        {
+            sharedCount1.setCount(12);
+            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+            Assert.assertEquals(sharedCount1.getCount(), 12);
+
+            Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+            // new counter with faultyWatcher start
+            sharedCountWithFaultyWatcher.start();
+
+            for (int i = 0; i < 10; i++) {
+                sharedCount1.setCount(13 + i);
+                Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+                server.restart();
+
+                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                // CURATOR-311 introduces to Curator's client reading server's shared count value
+                // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(sharedCount1);
+            CloseableUtils.closeQuietly(curatorFramework1);
+            CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+            CloseableUtils.closeQuietly(curatorFramework2);
+        }
+    }
+
+    private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+
+        class FaultyCuratorWatcher implements CuratorWatcher {
+            @Override
+            public void process(WatchedEvent event) throws Exception {
+                // everything will be ignored
+            }
+        }
+
+        final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+
+        class FaultySharedValue extends SharedValue {
+            public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+                super(client, path, seedValue, fautlyWatcher);
+            }
+        };
+
+        final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+        class FaultySharedCount extends SharedCount {
+            public FaultySharedCount(CuratorFramework client, String path, int val) {
+                super(client, path, faultySharedValue);
+            }
+        };
+        return new FaultySharedCount(curatorFramework, path, val);
+    }
+
+
 }


[04/21] curator git commit: Make sure readValueAndNotifyListenersInBackground() is called after a connection problem

Posted by ra...@apache.org.
Make sure readValueAndNotifyListenersInBackground() is called after a connection problem


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5de6b818
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5de6b818
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5de6b818

Branch: refs/heads/CURATOR-419
Commit: 5de6b818a8180291a6769e8db7d14b370dfb5221
Parents: 174faef
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 11:05:57 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 11:05:57 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/framework/recipes/shared/SharedValue.java | 5 +++--
 .../curator/framework/recipes/shared/TestSharedCount.java    | 8 +++-----
 2 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5de6b818/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 68fd5b5..5d7abce 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -79,7 +79,7 @@ public class SharedValue implements Closeable, SharedValueReader
         public void stateChanged(CuratorFramework client, ConnectionState newState)
         {
             notifyListenerOfStateChanged(newState);
-            if ( newState == ConnectionState.RECONNECTED )
+            if ( newState.isConnected() )
             {
                 try
                 {
@@ -87,6 +87,7 @@ public class SharedValue implements Closeable, SharedValueReader
                 }
                 catch ( Exception e )
                 {
+                    ThreadUtils.checkInterrupted(e);
                     log.error("Could not read value after reconnect", e);
                 }
             }
@@ -115,7 +116,7 @@ public class SharedValue implements Closeable, SharedValueReader
     }
 
     @VisibleForTesting
-    protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+    protected SharedValue(WatcherRemoveCuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
     {
         this.client = client;
         this.path = PathUtils.validatePath(path);

http://git-wip-us.apache.org/repos/asf/curator/blob/5de6b818/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index d7ebb6c..3123c7d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -516,18 +516,16 @@ public class TestSharedCount extends BaseClassForTests
 
     private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
 
-        class FaultyCuratorWatcher implements CuratorWatcher {
+        final CuratorWatcher faultyWatcher = new CuratorWatcher() {
             @Override
             public void process(WatchedEvent event) throws Exception {
                 // everything will be ignored
             }
-        }
-
-        final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+        };
 
         class FaultySharedValue extends SharedValue {
             public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
-                super(client, path, seedValue, fautlyWatcher);
+                super(client.newWatcherRemoveCuratorFramework(), path, seedValue, faultyWatcher);
             }
         };
 


[18/21] curator git commit: GetDataBuilderImpl() wasn't handling storingStatIn for async because the old DSL didn't support it

Posted by ra...@apache.org.
GetDataBuilderImpl() wasn't handling storingStatIn for async because the old DSL didn't support it


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c6f7aeb3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c6f7aeb3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c6f7aeb3

Branch: refs/heads/CURATOR-419
Commit: c6f7aeb39957158fad0207614583f867729e7770
Parents: 123f2ec
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:44:56 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:44:56 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/framework/imps/GetDataBuilderImpl.java |  5 +++++
 .../org/apache/curator/x/async/TestBasicOperations.java   | 10 ++++++++++
 2 files changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c6f7aeb3/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index 95af6dd..2319b9d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.Callable;
@@ -250,6 +251,10 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
                 {
                     watching.commitWatcher(rc, false);
                     trace.setReturnCode(rc).setResponseBytesLength(data).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(stat).commit();
+                    if ( (responseStat != null) && (stat != null) )
+                    {
+                        DataTree.copyStat(stat, responseStat);
+                    }
                     if ( decompress && (data != null) )
                     {
                         try

http://git-wip-us.apache.org/repos/asf/curator/blob/c6f7aeb3/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 3e980ec..f814146 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -189,4 +189,14 @@ public class TestBasicOperations extends CompletableBaseClassForTests
             Assert.assertEquals(v.getCode(), KeeperException.Code.CONNECTIONLOSS);
         });
     }
+
+    @Test
+    public void testGetDataWithStat()
+    {
+        complete(client.create().forPath("/test", "hey".getBytes()));
+
+        Stat stat = new Stat();
+        complete(client.getData().storingStatIn(stat).forPath("/test"));
+        Assert.assertEquals(stat.getDataLength(), "hey".length());
+    }
 }


[16/21] curator git commit: Added asyncEnsureParents()

Posted by ra...@apache.org.
Added asyncEnsureParents()


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/123f2ece
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/123f2ece
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/123f2ece

Branch: refs/heads/CURATOR-419
Commit: 123f2ece539f924705945f462030ec2b9692aebd
Parents: 1b6216e
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:23 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:23 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   | 132 +++++++++++++++++--
 1 file changed, 120 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/123f2ece/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,13 +18,16 @@
  */
 package org.apache.curator.x.async;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
 import java.util.Collections;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
@@ -70,8 +73,79 @@ import java.util.concurrent.TimeUnit;
 public class AsyncWrappers
 {
     /**
-     * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
-     * the given executor
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+    {
+        return childrenWithData(client, path, false);
+    }
+
+    /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @param isCompressed pass true if data is compressed
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+    {
+        CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+        client.getChildren().forPath(path).handle((children, e) -> {
+            if ( e != null )
+            {
+                if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+                {
+                    future.complete(Maps.newHashMap());
+                }
+                else
+                {
+                    future.completeExceptionally(e);
+                }
+            }
+            else
+            {
+                completeChildren(client, future, path, children, isCompressed);
+            }
+            return null;
+        });
+        return future;
+    }
+
+    /**
+     * Asynchronously ensure that the parents of the given path are created
+     *
+     * @param client client
+     * @param path path to ensure
+     * @return stage
+     */
+    public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+    {
+        return ensure(client, path, ExistsOption.createParentsIfNeeded);
+    }
+
+    /**
+     * Asynchronously ensure that the parents of the given path are created as containers
      *
      * @param client client
      * @param path path to ensure
@@ -79,14 +153,7 @@ public class AsyncWrappers
      */
     public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
     {
-        String localPath = ZKPaths.makePath(path, "foo");
-        Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
-        return client
-            .checkExists()
-            .withOptions(options)
-            .forPath(localPath)
-            .thenApply(__ -> null)
-            ;
+        return ensure(client, path, ExistsOption.createParentsAsContainers);
     }
 
     /**
@@ -279,6 +346,47 @@ public class AsyncWrappers
         }
     }
 
+    private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+    {
+        Map<String, byte[]> nodes = Maps.newHashMap();
+        if ( children.size() == 0 )
+        {
+            future.complete(nodes);
+            return;
+        }
+
+        children.forEach(node -> {
+            String path = ZKPaths.makePath(parentPath, node);
+            AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+            stage.handle((data, e) -> {
+                if ( e != null )
+                {
+                    future.completeExceptionally(e);
+                }
+                else
+                {
+                    nodes.put(path, data);
+                    if ( nodes.size() == children.size() )
+                    {
+                        future.complete(nodes);
+                    }
+                }
+                return null;
+            });
+        });
+    }
+
+    private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+    {
+        String localPath = ZKPaths.makePath(path, "foo");
+        return client
+            .checkExists()
+            .withOptions(Collections.singleton(option))
+            .forPath(localPath)
+            .thenApply(__ -> null)
+            ;
+    }
+
     private AsyncWrappers()
     {
     }


[20/21] curator git commit: ModeledFrameworkImpl.set(T item, int version) was ignoring the version

Posted by ra...@apache.org.
ModeledFrameworkImpl.set(T item, int version) was ignoring the version


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/05d37e96
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/05d37e96
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/05d37e96

Branch: refs/heads/CURATOR-419
Commit: 05d37e961978719ab1ebeb40ff7bd5e7a8605f5e
Parents: 666b175
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:53:30 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:53:30 2017 -0500

----------------------------------------------------------------------
 .../modeled/details/ModeledFrameworkImpl.java   |  2 +-
 .../x/async/modeled/TestModeledFramework.java   | 30 +++++++-------------
 2 files changed, 12 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/05d37e96/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 44011ee..aa98602 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -156,7 +156,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public AsyncStage<String> set(T item, int version)
     {
-        return set(item, null, -1);
+        return set(item, null, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/05d37e96/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index 53eb517..a5ed998 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.async.modeled;
 
 import com.google.common.collect.Sets;
@@ -102,7 +103,8 @@ public class TestModeledFramework extends TestModeledFrameworkBase
     @Test
     public void testBadNode()
     {
-        complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()), (v, e) -> {});    // ignore error
+        complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()), (v, e) -> {
+        });    // ignore error
 
         ModeledFramework<TestModel> client = ModeledFramework.builder(async, modelSpec).watched().build();
         complete(client.read(), (model, e) -> Assert.assertTrue(e instanceof KeeperException.NoNodeException));
@@ -112,11 +114,8 @@ public class TestModeledFramework extends TestModeledFrameworkBase
     public void testSchema() throws Exception
     {
         Schema schema = modelSpec.schema();
-        try ( CuratorFramework schemaClient = CuratorFrameworkFactory.builder()
-            .connectString(server.getConnectString())
-            .retryPolicy(new RetryOneTime(1))
-            .schemaSet(new SchemaSet(Collections.singletonList(schema), false))
-            .build() ) {
+        try (CuratorFramework schemaClient = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).schemaSet(new SchemaSet(Collections.singletonList(schema), false)).build())
+        {
             schemaClient.start();
 
             try
@@ -138,7 +137,9 @@ public class TestModeledFramework extends TestModeledFrameworkBase
     public void testVersioned()
     {
         ModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec);
-        complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101))));
+        TestModel model = new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101));
+        complete(client.set(model));
+        complete(client.set(model));   // so that version goes to 1
 
         VersionedModeledFramework<TestModel> versioned = client.versioned();
         complete(versioned.read().whenComplete((v, e) -> {
@@ -146,13 +147,8 @@ public class TestModeledFramework extends TestModeledFrameworkBase
             Assert.assertTrue(v.version() > 0);
         }).thenCompose(versioned::set), (s, e) -> Assert.assertNull(e)); // version is correct should succeed
 
-        complete(versioned.read().whenComplete((v, e) -> {
-            Assert.assertNull(e);
-            Assert.assertTrue(v.version() > 0);
-        }).thenCompose(value -> {
-            Versioned<TestModel> badVersion = Versioned.from(value.model(), Integer.MAX_VALUE);
-            return versioned.set(badVersion);
-        }).whenComplete((s, e) -> Assert.assertTrue(e instanceof KeeperException.BadVersionException)));
+        Versioned<TestModel> badVersion = Versioned.from(model, 100000);
+        complete(versioned.set(badVersion), (v, e) -> Assert.assertTrue(e instanceof KeeperException.BadVersionException));
     }
 
     @Test
@@ -164,11 +160,7 @@ public class TestModeledFramework extends TestModeledFrameworkBase
         complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101))));
         complete(client.update(new TestModel("John", "Galt", "Galt's Gulch", 54, BigInteger.valueOf(88))), (__, e) -> Assert.assertNotNull(e, "Should've gotten an auth failure"));
 
-        try ( CuratorFramework authCurator = CuratorFrameworkFactory.builder()
-            .connectString(server.getConnectString())
-            .retryPolicy(new RetryOneTime(1))
-            .authorization("digest", "test:test".getBytes())
-            .build() )
+        try (CuratorFramework authCurator = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).authorization("digest", "test:test".getBytes()).build())
         {
             authCurator.start();
             ModeledFramework<TestModel> authClient = ModeledFramework.wrap(AsyncCuratorFramework.wrap(authCurator), aclModelSpec);


[15/21] curator git commit: Oops - modeled ZPath snuck into the main async package

Posted by ra...@apache.org.
Oops - modeled ZPath snuck into the main async package


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1b6216e9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1b6216e9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1b6216e9

Branch: refs/heads/CURATOR-419
Commit: 1b6216e97be2771e927b7302160957834039579c
Parents: 11be719
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:04:25 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:04:25 2017 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/curator/x/async/AsyncWrappers.java    | 6 ++++--
 .../java/org/apache/curator/x/async/TestBasicOperations.java   | 3 +--
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1b6216e9/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 7da82fc..9630985 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async;
 
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.ExistsOption;
 import org.apache.curator.x.async.modeled.ZPath;
 import java.util.Collections;
@@ -76,13 +77,14 @@ public class AsyncWrappers
      * @param path path to ensure
      * @return stage
      */
-    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
+    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
     {
+        String localPath = ZKPaths.makePath(path, "foo");
         Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
         return client
             .checkExists()
             .withOptions(options)
-            .forPath(path.child("foo").fullPath())
+            .forPath(localPath)
             .thenApply(__ -> null)
             ;
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/1b6216e9/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 78f37c2..3e980ec 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -23,7 +23,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
@@ -69,7 +68,7 @@ public class TestBasicOperations extends CompletableBaseClassForTests
     @Test
     public void testCreateTransactionWithMode() throws Exception
     {
-        complete(AsyncWrappers.asyncEnsureContainers(client, ZPath.parse("/test")));
+        complete(AsyncWrappers.asyncEnsureContainers(client, "/test"));
 
         CuratorOp op1 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");
         CuratorOp op2 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");


[03/21] curator git commit: Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311

Posted by ra...@apache.org.
Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/174faef5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/174faef5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/174faef5

Branch: refs/heads/CURATOR-419
Commit: 174faef5f0de10626c616d2a25eb9fb1e5572966
Parents: 0f5d10d 8c1c5ff
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 10:55:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 10:55:00 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |   5 +
 .../framework/recipes/shared/SharedValue.java   |  27 ++++-
 .../recipes/shared/TestSharedCount.java         | 116 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1a3d889,7e3f26a..68fd5b5
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@@ -94,9 -106,21 +107,21 @@@ public class SharedValue implements Clo
       */
      public SharedValue(CuratorFramework client, String path, byte[] seedValue)
      {
 -        this.client = client;
 +        this.client = client.newWatcherRemoveCuratorFramework();
          this.path = PathUtils.validatePath(path);
          this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+         this.watcher = new SharedValueCuratorWatcher();
+         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+     }
+ 
+     @VisibleForTesting
+     protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+     {
+         this.client = client;
+         this.path = PathUtils.validatePath(path);
+         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+         // inject watcher for testing
+         this.watcher = watcher;
          currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
      }
  

http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 0690d6a,330c8f4..d7ebb6c
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@@ -23,9 -23,9 +23,10 @@@ import com.google.common.collect.Lists
  import com.google.common.util.concurrent.ThreadFactoryBuilder;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
 +import org.apache.curator.framework.imps.TestCleanState;
  import org.apache.curator.framework.api.BackgroundCallback;
  import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
  import org.apache.curator.framework.state.ConnectionState;
  import org.apache.curator.framework.state.ConnectionStateListener;
  import org.apache.curator.retry.RetryNTimes;
@@@ -424,7 -392,115 +430,115 @@@ public class TestSharedCount extends Ba
          finally
          {
              CloseableUtils.closeQuietly(sharedCount);
 -            CloseableUtils.closeQuietly(curatorFramework);
 +            TestCleanState.closeAndTestClean(curatorFramework);
          }
      }
+ 
+ 
+     @Test
+     public void testDisconnectReconnectWithMultipleClients() throws Exception
+     {
+         CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+         CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+ 
+         curatorFramework1.start();
+         curatorFramework1.blockUntilConnected();
+         curatorFramework2.start();
+         curatorFramework2.blockUntilConnected();
+ 
+         final String sharedCountPath = "/count";
+         final int initialCount = 10;
+         SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+         SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+ 
+         class MySharedCountListener implements SharedCountListener
+         {
+             final public Phaser gotSuspendEvent = new Phaser(1);
+             final public Phaser gotChangeEvent = new Phaser(1);
+             final public Phaser getReconnectEvent = new Phaser(1);
+             final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+ 
+             @Override
+             public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+             {
+                 numChangeEvents.incrementAndGet();
+                 gotChangeEvent.arrive();
+             }
+ 
+             @Override
+             public void stateChanged(CuratorFramework client, ConnectionState newState)
+             {
+                 if (newState == ConnectionState.SUSPENDED) {
+                     gotSuspendEvent.arrive();
+                 } else if (newState == ConnectionState.RECONNECTED) {
+                     getReconnectEvent.arrive();
+                 }
+             }
+         }
+ 
+         MySharedCountListener listener1 = new MySharedCountListener();
+         sharedCount1.addListener(listener1);
+         sharedCount1.start();
+         MySharedCountListener listener2 = new MySharedCountListener();
+         sharedCountWithFaultyWatcher.addListener(listener2);
+ 
+         try
+         {
+             sharedCount1.setCount(12);
+             Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+             Assert.assertEquals(sharedCount1.getCount(), 12);
+ 
+             Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+             // new counter with faultyWatcher start
+             sharedCountWithFaultyWatcher.start();
+ 
+             for (int i = 0; i < 10; i++) {
+                 sharedCount1.setCount(13 + i);
+                 Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+ 
+                 server.restart();
+ 
+                 Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                 // CURATOR-311 introduces to Curator's client reading server's shared count value
+                 // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+                 Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                 Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+             }
+         }
+         finally
+         {
+             CloseableUtils.closeQuietly(sharedCount1);
+             CloseableUtils.closeQuietly(curatorFramework1);
+             CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+             CloseableUtils.closeQuietly(curatorFramework2);
+         }
+     }
+ 
+     private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+ 
+         class FaultyCuratorWatcher implements CuratorWatcher {
+             @Override
+             public void process(WatchedEvent event) throws Exception {
+                 // everything will be ignored
+             }
+         }
+ 
+         final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+ 
+         class FaultySharedValue extends SharedValue {
+             public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+                 super(client, path, seedValue, fautlyWatcher);
+             }
+         };
+ 
+         final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+         class FaultySharedCount extends SharedCount {
+             public FaultySharedCount(CuratorFramework client, String path, int val) {
+                 super(client, path, faultySharedValue);
+             }
+         };
+         return new FaultySharedCount(curatorFramework, path, val);
+     }
+ 
+ 
  }


[07/21] curator git commit: Merge branch 'master' into CURATOR-419

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-419


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3b9d6062
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3b9d6062
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3b9d6062

Branch: refs/heads/CURATOR-419
Commit: 3b9d60626bd5f469dbe0be37a4f952ed2f7dd15e
Parents: 4ad1fb8 12cc7ce
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 14:40:48 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 14:40:48 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |   5 +
 .../framework/recipes/shared/SharedValue.java   |  28 ++++-
 .../recipes/shared/TestSharedCount.java         | 117 ++++++++++++++++++-
 3 files changed, 148 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[21/21] curator git commit: tests cleanup was closing in wrong order causing instability

Posted by ra...@apache.org.
tests cleanup was closing in wrong order causing instability


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7d4f0623
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7d4f0623
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7d4f0623

Branch: refs/heads/CURATOR-419
Commit: 7d4f0623885c8c00a28819bcb823ad3d6a8732d1
Parents: 05d37e9
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 16:05:52 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 16:05:52 2017 -0500

----------------------------------------------------------------------
 .../discovery/details/TestServiceDiscovery.java | 130 +++++++------------
 1 file changed, 49 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/7d4f0623/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 989edaf..47c74d5 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -33,7 +33,6 @@ import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import java.io.Closeable;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -54,18 +53,18 @@ public class TestServiceDiscovery extends BaseClassForTests
     @Test
     public void testCrashedServerMultiInstances() throws Exception
     {
-        List<Closeable> closeables = Lists.newArrayList();
+        CuratorFramework client = null;
+        ServiceDiscovery<String> discovery = null;
         try
         {
             Timing timing = new Timing();
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
             client.start();
 
             final Semaphore semaphore = new Semaphore(0);
             ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
             ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
-            ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
+            discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
             {
                 @Override
                 protected void internalRegisterService(ServiceInstance<String> service) throws Exception
@@ -74,7 +73,6 @@ public class TestServiceDiscovery extends BaseClassForTests
                     semaphore.release();
                 }
             };
-            closeables.add(discovery);
             discovery.start();
             discovery.registerService(instance2);
 
@@ -85,34 +83,31 @@ public class TestServiceDiscovery extends BaseClassForTests
             server.stop();
 
             server.restart();
-            closeables.add(server);
 
             timing.acquireSemaphore(semaphore, 2);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
         }
         finally
         {
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
     public void testCrashedServer() throws Exception
     {
-        List<Closeable> closeables = Lists.newArrayList();
+        CuratorFramework client = null;
+        ServiceDiscovery<String> discovery = null;
         try
         {
             Timing timing = new Timing();
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
             client.start();
 
             final Semaphore semaphore = new Semaphore(0);
             ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
+            discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
             {
                 @Override
                 protected void internalRegisterService(ServiceInstance<String> service) throws Exception
@@ -121,7 +116,6 @@ public class TestServiceDiscovery extends BaseClassForTests
                     semaphore.release();
                 }
             };
-            closeables.add(discovery);
             discovery.start();
 
             timing.acquireSemaphore(semaphore);
@@ -131,35 +125,31 @@ public class TestServiceDiscovery extends BaseClassForTests
             server.stop();
 
             server.restart();
-            closeables.add(server);
 
             timing.acquireSemaphore(semaphore);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
         }
         finally
         {
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
     public void testCrashedInstance() throws Exception
     {
-        List<Closeable> closeables = Lists.newArrayList();
+        CuratorFramework client = null;
+        ServiceDiscovery<String> discovery = null;
         try
         {
             Timing timing = new Timing();
 
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
             client.start();
 
             ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
-            closeables.add(discovery);
+            discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
             discovery.start();
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
@@ -171,11 +161,8 @@ public class TestServiceDiscovery extends BaseClassForTests
         }
         finally
         {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -185,11 +172,11 @@ public class TestServiceDiscovery extends BaseClassForTests
         final String SERVICE_ONE = "one";
         final String SERVICE_TWO = "two";
 
-        List<Closeable> closeables = Lists.newArrayList();
+        CuratorFramework client = null;
+        ServiceDiscovery<Void> discovery = null;
         try
         {
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             client.start();
 
             ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
@@ -197,8 +184,7 @@ public class TestServiceDiscovery extends BaseClassForTests
             ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
             ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
 
-            ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
-            closeables.add(discovery);
+            discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
             discovery.start();
 
             discovery.registerService(s1_i1);
@@ -227,27 +213,23 @@ public class TestServiceDiscovery extends BaseClassForTests
         }
         finally
         {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
     public void testBasic() throws Exception
     {
-        List<Closeable> closeables = Lists.newArrayList();
+        CuratorFramework client = null;
+        ServiceDiscovery<String> discovery = null;
         try
         {
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             client.start();
 
             ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
-            closeables.add(discovery);
+            discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
             discovery.start();
 
             Assert.assertEquals(discovery.queryForNames(), Collections.singletonList("test"));
@@ -258,11 +240,8 @@ public class TestServiceDiscovery extends BaseClassForTests
         }
         finally
         {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -271,16 +250,16 @@ public class TestServiceDiscovery extends BaseClassForTests
     {
         Timing timing = new Timing();
         server.stop();
-        List<Closeable> closeables = Lists.newArrayList();
+
+        CuratorFramework client = null;
+        ServiceDiscovery<String> discovery = null;
         try
         {
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             client.start();
 
             ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
-            closeables.add(discovery);
+            discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
             discovery.start();
 
             server.restart();
@@ -293,11 +272,8 @@ public class TestServiceDiscovery extends BaseClassForTests
         }
         finally
         {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -308,7 +284,6 @@ public class TestServiceDiscovery extends BaseClassForTests
         final String name = "name";
 
         final CountDownLatch restartLatch = new CountDownLatch(1);
-        List<Closeable> closeables = Lists.newArrayList();
 
         InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class)
         {
@@ -333,15 +308,15 @@ public class TestServiceDiscovery extends BaseClassForTests
             }
         };
 
+        CuratorFramework client = null;
+        ServiceDiscovery<String> discovery = null;
         try
         {
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             client.start();
 
             ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
-            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
-            closeables.add(discovery);
+            discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
             discovery.start();
 
             Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered.");
@@ -358,27 +333,23 @@ public class TestServiceDiscovery extends BaseClassForTests
         }
         finally
         {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
     public void testCleaning() throws Exception
     {
-        List<Closeable> closeables = Lists.newArrayList();
+        CuratorFramework client = null;
+        ServiceDiscovery<String> discovery = null;
         try
         {
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-            closeables.add(client);
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             client.start();
 
             ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
-            closeables.add(discovery);
+            discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
             discovery.start();
             discovery.unregisterService(instance);
 
@@ -386,11 +357,8 @@ public class TestServiceDiscovery extends BaseClassForTests
         }
         finally
         {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
+            CloseableUtils.closeQuietly(discovery);
+            CloseableUtils.closeQuietly(client);
         }
     }
 }


[14/21] curator git commit: A few things for CURATOR-397

Posted by ra...@apache.org.
A few things for CURATOR-397

1. AsyncWrappers.asyncEnsureContainers was just wrong - this is a better implementation
2. Added raw serializer constant
3. Add ModeledOptions which can be expanded in the future. For now it just has ignoreMissingNodesForChildren


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11be719b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11be719b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11be719b

Branch: refs/heads/CURATOR-419
Commit: 11be719b32bcf8879c44a0c2005ba5a2107986cb
Parents: 4a0e022
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 08:13:03 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 08:16:01 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   | 38 +++++++-------------
 .../x/async/modeled/ModelSerializer.java        | 18 ++++++++++
 .../async/modeled/ModeledFrameworkBuilder.java  | 20 ++++++++++-
 .../curator/x/async/modeled/ModeledOptions.java | 29 +++++++++++++++
 .../modeled/details/ModeledFrameworkImpl.java   | 33 +++++++++++++----
 5 files changed, 104 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index e982cf2..7da82fc 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -20,7 +20,10 @@ package org.apache.curator.x.async;
 
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.api.ExistsOption;
 import org.apache.curator.x.async.modeled.ZPath;
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
@@ -67,38 +70,21 @@ public class AsyncWrappers
 {
     /**
      * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
-     * the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
-     *
-     * @param client client
-     * @param path path to ensure
-     * @return stage
-     */
-    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
-    {
-        return asyncEnsureContainers(client, path, null);
-    }
-
-    /**
-     * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
      * the given executor
      *
      * @param client client
      * @param path path to ensure
      * @return stage
      */
-    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor)
+    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
     {
-        Runnable proc = () -> {
-            try
-            {
-                client.unwrap().createContainers(path.fullPath());
-            }
-            catch ( Exception e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-        return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc);
+        Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
+        return client
+            .checkExists()
+            .withOptions(options)
+            .forPath(path.child("foo").fullPath())
+            .thenApply(__ -> null)
+            ;
     }
 
     /**
@@ -284,7 +270,7 @@ public class AsyncWrappers
                 future.complete(null);
             }
         }
-        catch ( Exception e )
+        catch ( Throwable e )
         {
             ThreadUtils.checkInterrupted(e);
             future.completeExceptionally(e);

http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
index 428096e..476f314 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
@@ -40,4 +40,22 @@ public interface ModelSerializer<T>
      * @throws RuntimeException if <code>bytes</code> is invalid or there was an error deserializing
      */
     T deserialize(byte[] bytes);
+
+    /**
+     * A pass through serializer
+     */
+    ModelSerializer<byte[]> raw = new ModelSerializer<byte[]>()
+    {
+        @Override
+        public byte[] serialize(byte[] model)
+        {
+            return model;
+        }
+
+        @Override
+        public byte[] deserialize(byte[] bytes)
+        {
+            return bytes;
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
index 2e8bec3..1df68e6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
@@ -18,13 +18,16 @@
  */
 package org.apache.curator.x.async.modeled;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl;
 import org.apache.zookeeper.WatchedEvent;
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.UnaryOperator;
 
 public class ModeledFrameworkBuilder<T>
@@ -35,6 +38,7 @@ public class ModeledFrameworkBuilder<T>
     private UnaryOperator<WatchedEvent> watcherFilter;
     private UnhandledErrorListener unhandledErrorListener;
     private UnaryOperator<CuratorEvent> resultFilter;
+    private Set<ModeledOptions> modeledOptions;
 
     /**
      * Build a new ModeledFramework instance
@@ -49,7 +53,8 @@ public class ModeledFrameworkBuilder<T>
             watchMode,
             watcherFilter,
             unhandledErrorListener,
-            resultFilter
+            resultFilter,
+            modeledOptions
         );
     }
 
@@ -142,6 +147,18 @@ public class ModeledFrameworkBuilder<T>
         return this;
     }
 
+    /**
+     * Change the modeled options
+     *
+     * @param modeledOptions new options set
+     * @return this for chaining
+     */
+    public ModeledFrameworkBuilder<T> withOptions(Set<ModeledOptions> modeledOptions)
+    {
+        this.modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "client cannot be null"));
+        return this;
+    }
+
     ModeledFrameworkBuilder()
     {
     }
@@ -150,5 +167,6 @@ public class ModeledFrameworkBuilder<T>
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");
         this.modelSpec = Objects.requireNonNull(modelSpec, "modelSpec cannot be null");
+        modeledOptions = Collections.singleton(ModeledOptions.ignoreMissingNodesForChildren);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
new file mode 100644
index 0000000..434894b
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+public enum ModeledOptions
+{
+    /**
+     * Causes {@link ModeledFramework#children()} and {@link ModeledFramework#childrenAsZNodes()}
+     * to ignore {@link org.apache.zookeeper.KeeperException.NoNodeException} and merely return
+     * an empty list
+     */
+    ignoreMissingNodesForChildren
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index c1d19c4..44011ee 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -19,6 +19,8 @@
 package org.apache.curator.x.async.modeled.details;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -36,13 +38,16 @@ import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
 import org.apache.curator.x.async.modeled.ModelSpec;
 import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ModeledOptions;
 import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -62,13 +67,15 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     private final UnaryOperator<CuratorEvent> resultFilter;
     private final AsyncCuratorFrameworkDsl dslClient;
     private final boolean isWatched;
+    private final Set<ModeledOptions> modeledOptions;
 
-    public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter)
+    public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions)
     {
         boolean isWatched = (watchMode != null);
 
         Objects.requireNonNull(client, "client cannot be null");
         Objects.requireNonNull(model, "model cannot be null");
+        modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null"));
 
         watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess;
 
@@ -84,11 +91,12 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 
-    private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched)
+    private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched, Set<ModeledOptions> modeledOptions)
     {
         this.client = client;
         this.dslClient = dslClient;
@@ -99,6 +107,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
         this.unhandledErrorListener = unhandledErrorListener;
         this.resultFilter = resultFilter;
         this.isWatched = isWatched;
+        this.modeledOptions = modeledOptions;
     }
 
     @Override
@@ -280,7 +289,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
         asyncStage.whenComplete((children, e) -> {
             if ( e != null )
             {
-                modelStage.completeExceptionally(e);
+                if ( modeledOptions.contains(ModeledOptions.ignoreMissingNodesForChildren) && (Throwables.getRootCause(e) instanceof KeeperException.NoNodeException) )
+                {
+                    modelStage.complete(Collections.emptyList());
+                }
+                else
+                {
+                    modelStage.completeExceptionally(e);
+                }
             }
             else
             {
@@ -303,7 +319,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 
@@ -320,7 +337,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 
@@ -337,7 +355,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 


[08/21] curator git commit: Merge branch 'patch-1' of github.com:nivs/curator into CURATOR-420

Posted by ra...@apache.org.
Merge branch 'patch-1' of github.com:nivs/curator into CURATOR-420


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/34be09a4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/34be09a4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/34be09a4

Branch: refs/heads/CURATOR-419
Commit: 34be09a44c7a941a95245a5bc31a11771e4f7e86
Parents: 3b9d606 b1ffac9
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 08:56:51 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 08:56:51 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/framework/recipes/shared/SharedCount.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/34be09a4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------


[06/21] curator git commit: closes #200 - PR accepted and merged

Posted by ra...@apache.org.
closes #200 - PR accepted and merged


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/12cc7cec
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/12cc7cec
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/12cc7cec

Branch: refs/heads/CURATOR-419
Commit: 12cc7cec5dcb0cc512ec5900426fde74e3289614
Parents: 7442357
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:10:46 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:10:46 2017 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[12/21] curator git commit: license

Posted by ra...@apache.org.
license


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/716fb4aa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/716fb4aa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/716fb4aa

Branch: refs/heads/CURATOR-419
Commit: 716fb4aa44c27ef83f5c3f224197a9ff4c9e09b0
Parents: 1e06f40
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 12:52:33 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 12:52:33 2017 -0500

----------------------------------------------------------------------
 .../curator/utils/ExceptionAccumulator.java       | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/716fb4aa/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
index 2be2ee8..ee93ea9 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.utils;
 
 import com.google.common.base.Throwables;


[19/21] curator git commit: Merge branch 'master' into CURATOR-419

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-419


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/666b1752
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/666b1752
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/666b1752

Branch: refs/heads/CURATOR-419
Commit: 666b175228d7e642d1b0bbf049119a628501900e
Parents: 7401345 c6f7aeb
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:46:26 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:46:26 2017 -0500

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/GetDataBuilderImpl.java   | 5 +++++
 .../java/org/apache/curator/x/async/TestBasicOperations.java    | 4 ++--
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[17/21] curator git commit: wip for fixing tests

Posted by ra...@apache.org.
wip for fixing tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/74013456
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/74013456
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/74013456

Branch: refs/heads/CURATOR-419
Commit: 74013456da1260f9155a72fa651d5d6fa449d984
Parents: 123f2ec
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:21:23 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:21:23 2017 -0500

----------------------------------------------------------------------
 .../org/apache/curator/x/async/TestBasicOperations.java   | 10 ++++++++++
 .../curator/x/async/modeled/TestModeledFramework.java     |  8 ++++----
 2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/74013456/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 3e980ec..12d3014 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -189,4 +189,14 @@ public class TestBasicOperations extends CompletableBaseClassForTests
             Assert.assertEquals(v.getCode(), KeeperException.Code.CONNECTIONLOSS);
         });
     }
+
+    @Test
+    public void testGetDataWithStat()
+    {
+        complete(client.create().forPath("/test"));
+
+        Stat stat = new Stat();
+        complete(client.getData().storingStatIn(stat).forPath("/test"));
+        Assert.assertEquals(stat.getVersion(), 1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/74013456/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index 42a9e63..53eb517 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -102,10 +102,10 @@ public class TestModeledFramework extends TestModeledFrameworkBase
     @Test
     public void testBadNode()
     {
-        complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()));
+        complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()), (v, e) -> {});    // ignore error
 
         ModeledFramework<TestModel> client = ModeledFramework.builder(async, modelSpec).watched().build();
-        complete(client.read().whenComplete((model, e) -> Assert.assertTrue(e instanceof RuntimeException)));
+        complete(client.read(), (model, e) -> Assert.assertTrue(e instanceof KeeperException.NoNodeException));
     }
 
     @Test
@@ -138,13 +138,13 @@ public class TestModeledFramework extends TestModeledFrameworkBase
     public void testVersioned()
     {
         ModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec);
-        client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101)));
+        complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101))));
 
         VersionedModeledFramework<TestModel> versioned = client.versioned();
         complete(versioned.read().whenComplete((v, e) -> {
             Assert.assertNull(e);
             Assert.assertTrue(v.version() > 0);
-        }).thenCompose(versioned::set).whenComplete((s, e) -> Assert.assertNull(e))); // version is correct should succeed
+        }).thenCompose(versioned::set), (s, e) -> Assert.assertNull(e)); // version is correct should succeed
 
         complete(versioned.read().whenComplete((v, e) -> {
             Assert.assertNull(e);


[11/21] curator git commit: closes #225 - merged (with slight modifications)

Posted by ra...@apache.org.
closes #225 - merged (with slight modifications)


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1e06f406
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1e06f406
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1e06f406

Branch: refs/heads/CURATOR-419
Commit: 1e06f406281aa893163c8639eebf1baa0cad673b
Parents: 4909f57
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 10:07:27 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 10:07:27 2017 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[05/21] curator git commit: fixed flakiness with TestSharedCount tests

Posted by ra...@apache.org.
fixed flakiness with TestSharedCount tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7442357b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7442357b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7442357b

Branch: refs/heads/CURATOR-419
Commit: 7442357b7e6dc76ff76c6bc6a3d361b65815f626
Parents: 5de6b81
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:00:16 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:00:16 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/TestSharedCount.java      | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/7442357b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 3123c7d..6a0b7c2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -78,6 +78,7 @@ public class TestSharedCount extends BaseClassForTests
                                 CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
                                 clients.add(client);
                                 client.start();
+                                client.checkExists().forPath("/");  // clear initial connect event
 
                                 SharedCount count = new SharedCount(client, "/count", 10);
                                 counts.add(count);
@@ -121,6 +122,7 @@ public class TestSharedCount extends BaseClassForTests
             CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             clients.add(client);
             client.start();
+            client.checkExists().forPath("/");  // clear initial connect event
 
             Assert.assertTrue(startLatch.await(10, TimeUnit.SECONDS));
 
@@ -438,13 +440,14 @@ public class TestSharedCount extends BaseClassForTests
     @Test
     public void testDisconnectReconnectWithMultipleClients() throws Exception
     {
+        Timing timing = new Timing();
         CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
         CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
 
         curatorFramework1.start();
-        curatorFramework1.blockUntilConnected();
+        curatorFramework1.checkExists().forPath("/");   // clear initial connect events
         curatorFramework2.start();
-        curatorFramework2.blockUntilConnected();
+        curatorFramework2.checkExists().forPath("/");   // clear initial connect events
 
         final String sharedCountPath = "/count";
         final int initialCount = 10;
@@ -485,7 +488,7 @@ public class TestSharedCount extends BaseClassForTests
         try
         {
             sharedCount1.setCount(12);
-            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, timing.seconds(), TimeUnit.SECONDS), 1);
             Assert.assertEquals(sharedCount1.getCount(), 12);
 
             Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
@@ -498,10 +501,10 @@ public class TestSharedCount extends BaseClassForTests
 
                 server.restart();
 
-                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
                 // CURATOR-311 introduces to Curator's client reading server's shared count value
                 // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
-                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
                 Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
             }
         }


[09/21] curator git commit: close was double closing things. Also introduce an ExceptionAccumulator

Posted by ra...@apache.org.
close was double closing things. Also introduce an ExceptionAccumulator


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e158bbc7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e158bbc7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e158bbc7

Branch: refs/heads/CURATOR-419
Commit: e158bbc75affef29828b98373e864b2c460c1f4d
Parents: 34be09a
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 09:57:54 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 09:57:54 2017 -0500

----------------------------------------------------------------------
 .../curator/utils/ExceptionAccumulator.java     | 51 ++++++++++++++++++++
 .../discovery/details/ServiceDiscoveryImpl.java | 21 +++++---
 2 files changed, 64 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e158bbc7/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
new file mode 100644
index 0000000..2be2ee8
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
@@ -0,0 +1,51 @@
+package org.apache.curator.utils;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Utility to accumulate multiple potential exceptions into one that
+ * is thrown at the end
+ */
+public class ExceptionAccumulator
+{
+    private volatile Throwable mainEx = null;
+
+    /**
+     * If there is an accumulated exception, throw it
+     */
+    public void propagate()
+    {
+        if ( mainEx != null )
+        {
+            Throwables.propagate(mainEx);
+        }
+    }
+
+    /**
+     * Add an exception into the accumulated exceptions. Note:
+     * if the exception is {@link java.lang.InterruptedException}
+     * then <code>Thread.currentThread().interrupt()</code> is called.
+     *
+     * @param e the exception
+     */
+    public void add(Throwable e)
+    {
+        if ( e instanceof InterruptedException )
+        {
+            if ( mainEx != null )
+            {
+                e.addSuppressed(mainEx);
+            }
+            Thread.currentThread().interrupt();
+        }
+
+        if ( mainEx == null )
+        {
+            mainEx = e;
+        }
+        else
+        {
+            mainEx.addSuppressed(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e158bbc7/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 762c9a8..476705c 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ExceptionAccumulator;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -39,7 +40,6 @@ import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.ServiceProvider;
 import org.apache.curator.x.discovery.ServiceProviderBuilder;
-import org.apache.curator.x.discovery.ServiceType;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -77,9 +77,12 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
                     log.debug("Re-registering due to reconnection");
                     reRegisterServices();
                 }
+                catch (InterruptedException ex)
+                {
+                    Thread.currentThread().interrupt();
+                }
                 catch ( Exception e )
                 {
-                    ThreadUtils.checkInterrupted(e);
                     log.error("Could not re-register instances after reconnection", e);
                 }
             }
@@ -140,10 +143,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public void close() throws IOException
     {
-        for ( ServiceCache<T> cache : Lists.newArrayList(caches) )
-        {
-            CloseableUtils.closeQuietly(cache);
-        }
+        ExceptionAccumulator accumulator = new ExceptionAccumulator();
         for ( ServiceProvider<T> provider : Lists.newArrayList(providers) )
         {
             CloseableUtils.closeQuietly(provider);
@@ -161,12 +161,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             }
             catch ( Exception e )
             {
-                ThreadUtils.checkInterrupted(e);
+                accumulator.add(e);
                 log.error("Could not unregister instance: " + entry.service.getName(), e);
             }
         }
 
         client.getConnectionStateListenable().removeListener(connectionStateListener);
+        accumulator.propagate();
     }
 
     /**
@@ -469,9 +470,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         {
             nodeCache.start(true);
         }
+        catch ( InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            return null;
+        }
         catch ( Exception e )
         {
-            ThreadUtils.checkInterrupted(e);
             log.error("Could not start node cache for: " + instance, e);
         }
         NodeCacheListener listener = new NodeCacheListener()


[02/21] curator git commit: SharedCount: fix removeListener

Posted by ra...@apache.org.
SharedCount: fix removeListener

`removeListener` only removed the listener from the SharedCount's `listeners` member, which had no effect.
This fix retrieves the `SharedValueListener` created for wrapping the `SharedCountListener` and removes it from the SharedValue's listeners.

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b1ffac91
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b1ffac91
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b1ffac91

Branch: refs/heads/CURATOR-419
Commit: b1ffac9145750e03c01c4ce6823f4a8536054a1e
Parents: 8b28b12
Author: Niv Singer <ni...@innerlogics.com>
Authored: Wed Jul 5 11:47:13 2017 +0300
Committer: GitHub <no...@github.com>
Committed: Wed Jul 5 11:47:13 2017 +0300

----------------------------------------------------------------------
 .../apache/curator/framework/recipes/shared/SharedCount.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/b1ffac91/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..50a9f0e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -141,7 +141,10 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
     @Override
     public void     removeListener(SharedCountListener listener)
     {
-        listeners.remove(listener);
+        SharedValueListener valueListener = listeners.remove(listener);
+        if(valueListener != null) {
+            sharedValue.getListenable().removeListener(valueListener);
+        }
     }
 
     /**


[10/21] curator git commit: Add comment to close to make clear that the provider should close its caches

Posted by ra...@apache.org.
Add comment to close to make clear that the provider should close its caches


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4909f57c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4909f57c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4909f57c

Branch: refs/heads/CURATOR-419
Commit: 4909f57c172c3f90f9aedeac07772d228a9c948c
Parents: e158bbc
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 09:59:36 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 09:59:36 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/curator/x/discovery/ServiceProvider.java  | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/4909f57c/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
index d606649..f542ed3 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.discovery;
 
 import org.apache.curator.x.discovery.details.InstanceProvider;
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -61,4 +62,10 @@ public interface ServiceProvider<T> extends Closeable
      * @param instance instance that had an error
      */
     public void noteError(ServiceInstance<T> instance);
+
+    /**
+     * Close the provider. Note: it's the provider's responsibility to close any caches it manages
+     */
+    @Override
+    void close() throws IOException;
 }


[13/21] curator git commit: Fixed create mode for AsyncCuratorFramework.transactionOp().create(). It was being ignored. Added a test as well

Posted by ra...@apache.org.
Fixed create mode for AsyncCuratorFramework.transactionOp().create(). It was being ignored. Added a test as well


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4a0e022c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4a0e022c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4a0e022c

Branch: refs/heads/CURATOR-419
Commit: 4a0e022ce361e21b9f1751434166c5a90c4a8845
Parents: 716fb4a
Author: randgalt <ra...@apache.org>
Authored: Thu Jul 13 23:51:43 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jul 13 23:51:43 2017 -0500

----------------------------------------------------------------------
 .../x/async/details/AsyncTransactionOpImpl.java |  4 ++--
 .../x/async/CompletableBaseClassForTests.java   |  8 ++++++-
 .../curator/x/async/TestBasicOperations.java    | 22 ++++++++++++++------
 3 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/4a0e022c/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
index 7b158f8..2a2c293 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.ACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.ACLPathAndBytesable;
 import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.framework.api.VersionPathAndBytesable;
 import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -115,7 +115,7 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp
             private CuratorOp internalForPath(String path, byte[] data, boolean useData)
             {
                 TransactionCreateBuilder2<CuratorOp> builder1 = (ttl > 0) ? client.transactionOp().create().withTtl(ttl) : client.transactionOp().create();
-                ACLCreateModePathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed() : builder1;
+                ACLPathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed().withMode(createMode) : builder1.withMode(createMode);
                 PathAndBytesable<CuratorOp> builder3 = builder2.withACL(aclList);
                 try
                 {

http://git-wip-us.apache.org/repos/asf/curator/blob/4a0e022c/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
index 232d301..4a964b1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async;
 
+import com.google.common.base.Throwables;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.testng.Assert;
@@ -33,7 +34,12 @@ public abstract class CompletableBaseClassForTests extends BaseClassForTests
 
     protected <T, U> void complete(CompletionStage<T> stage)
     {
-        complete(stage, (v, e) -> {});
+        complete(stage, (v, e) -> {
+            if ( e != null )
+            {
+                Throwables.propagate(e);
+            }
+        });
     }
 
     protected <T, U> void complete(CompletionStage<T> stage, BiConsumer<? super T, Throwable> handler)

http://git-wip-us.apache.org/repos/asf/curator/blob/4a0e022c/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 0274413..78f37c2 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -20,10 +20,10 @@ package org.apache.curator.x.async;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
@@ -32,17 +32,15 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
 
 import static java.util.EnumSet.of;
 import static org.apache.curator.x.async.api.CreateOption.compress;
 import static org.apache.curator.x.async.api.CreateOption.setDataIfExists;
 import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import static org.apache.zookeeper.CreateMode.PERSISTENT_SEQUENTIAL;
 
 public class TestBasicOperations extends CompletableBaseClassForTests
 {
@@ -69,6 +67,18 @@ public class TestBasicOperations extends CompletableBaseClassForTests
     }
 
     @Test
+    public void testCreateTransactionWithMode() throws Exception
+    {
+        complete(AsyncWrappers.asyncEnsureContainers(client, ZPath.parse("/test")));
+
+        CuratorOp op1 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");
+        CuratorOp op2 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");
+        complete(client.transaction().forOperations(Arrays.asList(op1, op2)));
+
+        Assert.assertEquals(client.unwrap().getChildren().forPath("/test").size(), 2);
+    }
+
+    @Test
     public void testCrud()
     {
         AsyncStage<String> createStage = client.create().forPath("/test", "one".getBytes());