You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/16 21:06:18 UTC
[01/21] curator git commit: CURATOR-311 - SharedValue could hold
stall data after reconnecting
Repository: curator
Updated Branches:
refs/heads/CURATOR-419 4ad1fb8e5 -> 7d4f06238
CURATOR-311 - SharedValue could hold stall data after reconnecting
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8c1c5ffa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8c1c5ffa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8c1c5ffa
Branch: refs/heads/CURATOR-419
Commit: 8c1c5ffa287d22eaea18bf6f89a4a8bf6d9b871c
Parents: 35d2cc0
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Wed Jan 11 20:30:46 2017 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Fri Feb 24 16:34:51 2017 +0900
----------------------------------------------------------------------
.../framework/recipes/shared/SharedCount.java | 5 +
.../framework/recipes/shared/SharedValue.java | 27 ++++-
.../recipes/shared/TestSharedCount.java | 116 ++++++++++++++++++-
3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..bdfa844 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -49,6 +49,11 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
sharedValue = new SharedValue(client, path, toBytes(seedValue));
}
+ protected SharedCount(CuratorFramework client, String path, SharedValue sv)
+ {
+ sharedValue = sv;
+ }
+
@Override
public int getCount()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1f9df37..7e3f26a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.shared;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
@@ -56,8 +57,9 @@ public class SharedValue implements Closeable, SharedValueReader
private final byte[] seedValue;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicReference<VersionedValue<byte[]>> currentValue;
+ private final CuratorWatcher watcher;
- private final CuratorWatcher watcher = new CuratorWatcher()
+ private class SharedValueCuratorWatcher implements CuratorWatcher
{
@Override
public void process(WatchedEvent event) throws Exception
@@ -76,6 +78,17 @@ public class SharedValue implements Closeable, SharedValueReader
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
notifyListenerOfStateChanged(newState);
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ try
+ {
+ readValueAndNotifyListenersInBackground();
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not read value after reconnect", e);
+ }
+ }
}
};
@@ -96,6 +109,18 @@ public class SharedValue implements Closeable, SharedValueReader
this.client = client;
this.path = PathUtils.validatePath(path);
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+ this.watcher = new SharedValueCuratorWatcher();
+ currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+ }
+
+ @VisibleForTesting
+ protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+ {
+ this.client = client;
+ this.path = PathUtils.validatePath(path);
+ this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+ // inject watcher for testing
+ this.watcher = watcher;
currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 7939f6e..330c8f4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
@@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -368,6 +371,7 @@ public class TestSharedCount extends BaseClassForTests
server.restart();
Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+ Assert.assertEquals(numChangeEvents.get(), 1);
sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
@@ -381,7 +385,9 @@ public class TestSharedCount extends BaseClassForTests
}).forPath("/count");
flushDone.await(5, TimeUnit.SECONDS);
- Assert.assertEquals(2, numChangeEvents.get());
+ // CURATOR-311: when a Curator client's state became RECONNECTED, countHasChanged method is called back
+ // because the Curator client calls readValueAndNotifyListenersInBackground in SharedValue#ConnectionStateListener#stateChanged.
+ Assert.assertEquals(numChangeEvents.get(), 3);
}
finally
{
@@ -389,4 +395,112 @@ public class TestSharedCount extends BaseClassForTests
CloseableUtils.closeQuietly(curatorFramework);
}
}
+
+
+ @Test
+ public void testDisconnectReconnectWithMultipleClients() throws Exception
+ {
+ CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+ CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+
+ curatorFramework1.start();
+ curatorFramework1.blockUntilConnected();
+ curatorFramework2.start();
+ curatorFramework2.blockUntilConnected();
+
+ final String sharedCountPath = "/count";
+ final int initialCount = 10;
+ SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+ SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+
+ class MySharedCountListener implements SharedCountListener
+ {
+ final public Phaser gotSuspendEvent = new Phaser(1);
+ final public Phaser gotChangeEvent = new Phaser(1);
+ final public Phaser getReconnectEvent = new Phaser(1);
+ final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ numChangeEvents.incrementAndGet();
+ gotChangeEvent.arrive();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if (newState == ConnectionState.SUSPENDED) {
+ gotSuspendEvent.arrive();
+ } else if (newState == ConnectionState.RECONNECTED) {
+ getReconnectEvent.arrive();
+ }
+ }
+ }
+
+ MySharedCountListener listener1 = new MySharedCountListener();
+ sharedCount1.addListener(listener1);
+ sharedCount1.start();
+ MySharedCountListener listener2 = new MySharedCountListener();
+ sharedCountWithFaultyWatcher.addListener(listener2);
+
+ try
+ {
+ sharedCount1.setCount(12);
+ Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+ Assert.assertEquals(sharedCount1.getCount(), 12);
+
+ Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+ // new counter with faultyWatcher start
+ sharedCountWithFaultyWatcher.start();
+
+ for (int i = 0; i < 10; i++) {
+ sharedCount1.setCount(13 + i);
+ Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+ server.restart();
+
+ Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ // CURATOR-311 introduces to Curator's client reading server's shared count value
+ // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+ Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+ }
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(sharedCount1);
+ CloseableUtils.closeQuietly(curatorFramework1);
+ CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+ CloseableUtils.closeQuietly(curatorFramework2);
+ }
+ }
+
+ private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+
+ class FaultyCuratorWatcher implements CuratorWatcher {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ // everything will be ignored
+ }
+ }
+
+ final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+
+ class FaultySharedValue extends SharedValue {
+ public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+ super(client, path, seedValue, fautlyWatcher);
+ }
+ };
+
+ final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+ class FaultySharedCount extends SharedCount {
+ public FaultySharedCount(CuratorFramework client, String path, int val) {
+ super(client, path, faultySharedValue);
+ }
+ };
+ return new FaultySharedCount(curatorFramework, path, val);
+ }
+
+
}
[04/21] curator git commit: Make sure
readValueAndNotifyListenersInBackground() is called after a connection
problem
Posted by ra...@apache.org.
Make sure readValueAndNotifyListenersInBackground() is called after a connection problem
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5de6b818
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5de6b818
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5de6b818
Branch: refs/heads/CURATOR-419
Commit: 5de6b818a8180291a6769e8db7d14b370dfb5221
Parents: 174faef
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 11:05:57 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 11:05:57 2017 -0500
----------------------------------------------------------------------
.../apache/curator/framework/recipes/shared/SharedValue.java | 5 +++--
.../curator/framework/recipes/shared/TestSharedCount.java | 8 +++-----
2 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/5de6b818/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 68fd5b5..5d7abce 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -79,7 +79,7 @@ public class SharedValue implements Closeable, SharedValueReader
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
notifyListenerOfStateChanged(newState);
- if ( newState == ConnectionState.RECONNECTED )
+ if ( newState.isConnected() )
{
try
{
@@ -87,6 +87,7 @@ public class SharedValue implements Closeable, SharedValueReader
}
catch ( Exception e )
{
+ ThreadUtils.checkInterrupted(e);
log.error("Could not read value after reconnect", e);
}
}
@@ -115,7 +116,7 @@ public class SharedValue implements Closeable, SharedValueReader
}
@VisibleForTesting
- protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+ protected SharedValue(WatcherRemoveCuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
{
this.client = client;
this.path = PathUtils.validatePath(path);
http://git-wip-us.apache.org/repos/asf/curator/blob/5de6b818/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index d7ebb6c..3123c7d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -516,18 +516,16 @@ public class TestSharedCount extends BaseClassForTests
private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
- class FaultyCuratorWatcher implements CuratorWatcher {
+ final CuratorWatcher faultyWatcher = new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
// everything will be ignored
}
- }
-
- final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+ };
class FaultySharedValue extends SharedValue {
public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
- super(client, path, seedValue, fautlyWatcher);
+ super(client.newWatcherRemoveCuratorFramework(), path, seedValue, faultyWatcher);
}
};
[18/21] curator git commit: GetDataBuilderImpl() wasn't handling
storingStatIn for async because the old DSL didn't support it
Posted by ra...@apache.org.
GetDataBuilderImpl() wasn't handling storingStatIn for async because the old DSL didn't support it
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c6f7aeb3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c6f7aeb3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c6f7aeb3
Branch: refs/heads/CURATOR-419
Commit: c6f7aeb39957158fad0207614583f867729e7770
Parents: 123f2ec
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:44:56 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:44:56 2017 -0500
----------------------------------------------------------------------
.../apache/curator/framework/imps/GetDataBuilderImpl.java | 5 +++++
.../org/apache/curator/x/async/TestBasicOperations.java | 10 ++++++++++
2 files changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c6f7aeb3/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index 95af6dd..2319b9d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
@@ -250,6 +251,10 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
{
watching.commitWatcher(rc, false);
trace.setReturnCode(rc).setResponseBytesLength(data).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(stat).commit();
+ if ( (responseStat != null) && (stat != null) )
+ {
+ DataTree.copyStat(stat, responseStat);
+ }
if ( decompress && (data != null) )
{
try
http://git-wip-us.apache.org/repos/asf/curator/blob/c6f7aeb3/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 3e980ec..f814146 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -189,4 +189,14 @@ public class TestBasicOperations extends CompletableBaseClassForTests
Assert.assertEquals(v.getCode(), KeeperException.Code.CONNECTIONLOSS);
});
}
+
+ @Test
+ public void testGetDataWithStat()
+ {
+ complete(client.create().forPath("/test", "hey".getBytes()));
+
+ Stat stat = new Stat();
+ complete(client.getData().storingStatIn(stat).forPath("/test"));
+ Assert.assertEquals(stat.getDataLength(), "hey".length());
+ }
}
[16/21] curator git commit: Added asyncEnsureParents()
Posted by ra...@apache.org.
Added asyncEnsureParents()
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/123f2ece
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/123f2ece
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/123f2ece
Branch: refs/heads/CURATOR-419
Commit: 123f2ece539f924705945f462030ec2b9692aebd
Parents: 1b6216e
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:23 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:23 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 132 +++++++++++++++++--
1 file changed, 120 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/123f2ece/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,13 +18,16 @@
*/
package org.apache.curator.x.async;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
import java.util.Collections;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -70,8 +73,79 @@ import java.util.concurrent.TimeUnit;
public class AsyncWrappers
{
/**
- * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
- * the given executor
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+ {
+ return childrenWithData(client, path, false);
+ }
+
+ /**
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @param isCompressed pass true if data is compressed
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+ {
+ CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+ client.getChildren().forPath(path).handle((children, e) -> {
+ if ( e != null )
+ {
+ if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+ {
+ future.complete(Maps.newHashMap());
+ }
+ else
+ {
+ future.completeExceptionally(e);
+ }
+ }
+ else
+ {
+ completeChildren(client, future, path, children, isCompressed);
+ }
+ return null;
+ });
+ return future;
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created
+ *
+ * @param client client
+ * @param path path to ensure
+ * @return stage
+ */
+ public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+ {
+ return ensure(client, path, ExistsOption.createParentsIfNeeded);
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created as containers
*
* @param client client
* @param path path to ensure
@@ -79,14 +153,7 @@ public class AsyncWrappers
*/
public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
{
- String localPath = ZKPaths.makePath(path, "foo");
- Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
- return client
- .checkExists()
- .withOptions(options)
- .forPath(localPath)
- .thenApply(__ -> null)
- ;
+ return ensure(client, path, ExistsOption.createParentsAsContainers);
}
/**
@@ -279,6 +346,47 @@ public class AsyncWrappers
}
}
+ private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+ {
+ Map<String, byte[]> nodes = Maps.newHashMap();
+ if ( children.size() == 0 )
+ {
+ future.complete(nodes);
+ return;
+ }
+
+ children.forEach(node -> {
+ String path = ZKPaths.makePath(parentPath, node);
+ AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+ stage.handle((data, e) -> {
+ if ( e != null )
+ {
+ future.completeExceptionally(e);
+ }
+ else
+ {
+ nodes.put(path, data);
+ if ( nodes.size() == children.size() )
+ {
+ future.complete(nodes);
+ }
+ }
+ return null;
+ });
+ });
+ }
+
+ private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+ {
+ String localPath = ZKPaths.makePath(path, "foo");
+ return client
+ .checkExists()
+ .withOptions(Collections.singleton(option))
+ .forPath(localPath)
+ .thenApply(__ -> null)
+ ;
+ }
+
private AsyncWrappers()
{
}
[20/21] curator git commit: ModeledFrameworkImpl.set(T item,
int version) was ignoring the version
Posted by ra...@apache.org.
ModeledFrameworkImpl.set(T item, int version) was ignoring the version
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/05d37e96
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/05d37e96
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/05d37e96
Branch: refs/heads/CURATOR-419
Commit: 05d37e961978719ab1ebeb40ff7bd5e7a8605f5e
Parents: 666b175
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:53:30 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:53:30 2017 -0500
----------------------------------------------------------------------
.../modeled/details/ModeledFrameworkImpl.java | 2 +-
.../x/async/modeled/TestModeledFramework.java | 30 +++++++-------------
2 files changed, 12 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/05d37e96/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 44011ee..aa98602 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -156,7 +156,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
@Override
public AsyncStage<String> set(T item, int version)
{
- return set(item, null, -1);
+ return set(item, null, version);
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/05d37e96/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index 53eb517..a5ed998 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.x.async.modeled;
import com.google.common.collect.Sets;
@@ -102,7 +103,8 @@ public class TestModeledFramework extends TestModeledFrameworkBase
@Test
public void testBadNode()
{
- complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()), (v, e) -> {}); // ignore error
+ complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()), (v, e) -> {
+ }); // ignore error
ModeledFramework<TestModel> client = ModeledFramework.builder(async, modelSpec).watched().build();
complete(client.read(), (model, e) -> Assert.assertTrue(e instanceof KeeperException.NoNodeException));
@@ -112,11 +114,8 @@ public class TestModeledFramework extends TestModeledFrameworkBase
public void testSchema() throws Exception
{
Schema schema = modelSpec.schema();
- try ( CuratorFramework schemaClient = CuratorFrameworkFactory.builder()
- .connectString(server.getConnectString())
- .retryPolicy(new RetryOneTime(1))
- .schemaSet(new SchemaSet(Collections.singletonList(schema), false))
- .build() ) {
+ try (CuratorFramework schemaClient = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).schemaSet(new SchemaSet(Collections.singletonList(schema), false)).build())
+ {
schemaClient.start();
try
@@ -138,7 +137,9 @@ public class TestModeledFramework extends TestModeledFrameworkBase
public void testVersioned()
{
ModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec);
- complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101))));
+ TestModel model = new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101));
+ complete(client.set(model));
+ complete(client.set(model)); // so that version goes to 1
VersionedModeledFramework<TestModel> versioned = client.versioned();
complete(versioned.read().whenComplete((v, e) -> {
@@ -146,13 +147,8 @@ public class TestModeledFramework extends TestModeledFrameworkBase
Assert.assertTrue(v.version() > 0);
}).thenCompose(versioned::set), (s, e) -> Assert.assertNull(e)); // version is correct should succeed
- complete(versioned.read().whenComplete((v, e) -> {
- Assert.assertNull(e);
- Assert.assertTrue(v.version() > 0);
- }).thenCompose(value -> {
- Versioned<TestModel> badVersion = Versioned.from(value.model(), Integer.MAX_VALUE);
- return versioned.set(badVersion);
- }).whenComplete((s, e) -> Assert.assertTrue(e instanceof KeeperException.BadVersionException)));
+ Versioned<TestModel> badVersion = Versioned.from(model, 100000);
+ complete(versioned.set(badVersion), (v, e) -> Assert.assertTrue(e instanceof KeeperException.BadVersionException));
}
@Test
@@ -164,11 +160,7 @@ public class TestModeledFramework extends TestModeledFrameworkBase
complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101))));
complete(client.update(new TestModel("John", "Galt", "Galt's Gulch", 54, BigInteger.valueOf(88))), (__, e) -> Assert.assertNotNull(e, "Should've gotten an auth failure"));
- try ( CuratorFramework authCurator = CuratorFrameworkFactory.builder()
- .connectString(server.getConnectString())
- .retryPolicy(new RetryOneTime(1))
- .authorization("digest", "test:test".getBytes())
- .build() )
+ try (CuratorFramework authCurator = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).authorization("digest", "test:test".getBytes()).build())
{
authCurator.start();
ModeledFramework<TestModel> authClient = ModeledFramework.wrap(AsyncCuratorFramework.wrap(authCurator), aclModelSpec);
[15/21] curator git commit: Oops - modeled ZPath snuck into the main
async package
Posted by ra...@apache.org.
Oops - modeled ZPath snuck into the main async package
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1b6216e9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1b6216e9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1b6216e9
Branch: refs/heads/CURATOR-419
Commit: 1b6216e97be2771e927b7302160957834039579c
Parents: 11be719
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:04:25 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:04:25 2017 -0500
----------------------------------------------------------------------
.../main/java/org/apache/curator/x/async/AsyncWrappers.java | 6 ++++--
.../java/org/apache/curator/x/async/TestBasicOperations.java | 3 +--
2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1b6216e9/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 7da82fc..9630985 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.async.api.ExistsOption;
import org.apache.curator.x.async.modeled.ZPath;
import java.util.Collections;
@@ -76,13 +77,14 @@ public class AsyncWrappers
* @param path path to ensure
* @return stage
*/
- public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
+ public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
{
+ String localPath = ZKPaths.makePath(path, "foo");
Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
return client
.checkExists()
.withOptions(options)
- .forPath(path.child("foo").fullPath())
+ .forPath(localPath)
.thenApply(__ -> null)
;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1b6216e9/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 78f37c2..3e980ec 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -23,7 +23,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.async.modeled.ZPath;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
@@ -69,7 +68,7 @@ public class TestBasicOperations extends CompletableBaseClassForTests
@Test
public void testCreateTransactionWithMode() throws Exception
{
- complete(AsyncWrappers.asyncEnsureContainers(client, ZPath.parse("/test")));
+ complete(AsyncWrappers.asyncEnsureContainers(client, "/test"));
CuratorOp op1 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");
CuratorOp op2 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");
[03/21] curator git commit: Merge branch 'CURATOR-311' of
github.com:oza/curator into CURATOR-311
Posted by ra...@apache.org.
Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/174faef5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/174faef5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/174faef5
Branch: refs/heads/CURATOR-419
Commit: 174faef5f0de10626c616d2a25eb9fb1e5572966
Parents: 0f5d10d 8c1c5ff
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 10:55:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 10:55:00 2017 -0500
----------------------------------------------------------------------
.../framework/recipes/shared/SharedCount.java | 5 +
.../framework/recipes/shared/SharedValue.java | 27 ++++-
.../recipes/shared/TestSharedCount.java | 116 ++++++++++++++++++-
3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1a3d889,7e3f26a..68fd5b5
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@@ -94,9 -106,21 +107,21 @@@ public class SharedValue implements Clo
*/
public SharedValue(CuratorFramework client, String path, byte[] seedValue)
{
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.path = PathUtils.validatePath(path);
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+ this.watcher = new SharedValueCuratorWatcher();
+ currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+ }
+
+ @VisibleForTesting
+ protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+ {
+ this.client = client;
+ this.path = PathUtils.validatePath(path);
+ this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+ // inject watcher for testing
+ this.watcher = watcher;
currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 0690d6a,330c8f4..d7ebb6c
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@@ -23,9 -23,9 +23,10 @@@ import com.google.common.collect.Lists
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
@@@ -424,7 -392,115 +430,115 @@@ public class TestSharedCount extends Ba
finally
{
CloseableUtils.closeQuietly(sharedCount);
- CloseableUtils.closeQuietly(curatorFramework);
+ TestCleanState.closeAndTestClean(curatorFramework);
}
}
+
+
+ @Test
+ public void testDisconnectReconnectWithMultipleClients() throws Exception
+ {
+ CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+ CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+
+ curatorFramework1.start();
+ curatorFramework1.blockUntilConnected();
+ curatorFramework2.start();
+ curatorFramework2.blockUntilConnected();
+
+ final String sharedCountPath = "/count";
+ final int initialCount = 10;
+ SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+ SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+
+ class MySharedCountListener implements SharedCountListener
+ {
+ final public Phaser gotSuspendEvent = new Phaser(1);
+ final public Phaser gotChangeEvent = new Phaser(1);
+ final public Phaser getReconnectEvent = new Phaser(1);
+ final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ numChangeEvents.incrementAndGet();
+ gotChangeEvent.arrive();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if (newState == ConnectionState.SUSPENDED) {
+ gotSuspendEvent.arrive();
+ } else if (newState == ConnectionState.RECONNECTED) {
+ getReconnectEvent.arrive();
+ }
+ }
+ }
+
+ MySharedCountListener listener1 = new MySharedCountListener();
+ sharedCount1.addListener(listener1);
+ sharedCount1.start();
+ MySharedCountListener listener2 = new MySharedCountListener();
+ sharedCountWithFaultyWatcher.addListener(listener2);
+
+ try
+ {
+ sharedCount1.setCount(12);
+ Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+ Assert.assertEquals(sharedCount1.getCount(), 12);
+
+ Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+ // new counter with faultyWatcher start
+ sharedCountWithFaultyWatcher.start();
+
+ for (int i = 0; i < 10; i++) {
+ sharedCount1.setCount(13 + i);
+ Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+ server.restart();
+
+ Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ // CURATOR-311 introduces to Curator's client reading server's shared count value
+ // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+ Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+ }
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(sharedCount1);
+ CloseableUtils.closeQuietly(curatorFramework1);
+ CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+ CloseableUtils.closeQuietly(curatorFramework2);
+ }
+ }
+
+ private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+
+ class FaultyCuratorWatcher implements CuratorWatcher {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ // everything will be ignored
+ }
+ }
+
+ final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+
+ class FaultySharedValue extends SharedValue {
+ public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+ super(client, path, seedValue, fautlyWatcher);
+ }
+ };
+
+ final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+ class FaultySharedCount extends SharedCount {
+ public FaultySharedCount(CuratorFramework client, String path, int val) {
+ super(client, path, faultySharedValue);
+ }
+ };
+ return new FaultySharedCount(curatorFramework, path, val);
+ }
+
+
}
[07/21] curator git commit: Merge branch 'master' into CURATOR-419
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-419
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3b9d6062
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3b9d6062
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3b9d6062
Branch: refs/heads/CURATOR-419
Commit: 3b9d60626bd5f469dbe0be37a4f952ed2f7dd15e
Parents: 4ad1fb8 12cc7ce
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 14:40:48 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 14:40:48 2017 -0500
----------------------------------------------------------------------
.../framework/recipes/shared/SharedCount.java | 5 +
.../framework/recipes/shared/SharedValue.java | 28 ++++-
.../recipes/shared/TestSharedCount.java | 117 ++++++++++++++++++-
3 files changed, 148 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[21/21] curator git commit: tests cleanup was closing in wrong order
causing instability
Posted by ra...@apache.org.
tests cleanup was closing in wrong order causing instability
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7d4f0623
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7d4f0623
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7d4f0623
Branch: refs/heads/CURATOR-419
Commit: 7d4f0623885c8c00a28819bcb823ad3d6a8732d1
Parents: 05d37e9
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 16:05:52 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 16:05:52 2017 -0500
----------------------------------------------------------------------
.../discovery/details/TestServiceDiscovery.java | 130 +++++++------------
1 file changed, 49 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/7d4f0623/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 989edaf..47c74d5 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -33,7 +33,6 @@ import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.io.Closeable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -54,18 +53,18 @@ public class TestServiceDiscovery extends BaseClassForTests
@Test
public void testCrashedServerMultiInstances() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ CuratorFramework client = null;
+ ServiceDiscovery<String> discovery = null;
try
{
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
final Semaphore semaphore = new Semaphore(0);
ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
+ discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false)
{
@Override
protected void internalRegisterService(ServiceInstance<String> service) throws Exception
@@ -74,7 +73,6 @@ public class TestServiceDiscovery extends BaseClassForTests
semaphore.release();
}
};
- closeables.add(discovery);
discovery.start();
discovery.registerService(instance2);
@@ -85,34 +83,31 @@ public class TestServiceDiscovery extends BaseClassForTests
server.stop();
server.restart();
- closeables.add(server);
timing.acquireSemaphore(semaphore, 2);
Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
}
finally
{
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
@Test
public void testCrashedServer() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ CuratorFramework client = null;
+ ServiceDiscovery<String> discovery = null;
try
{
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
final Semaphore semaphore = new Semaphore(0);
ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
+ discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false)
{
@Override
protected void internalRegisterService(ServiceInstance<String> service) throws Exception
@@ -121,7 +116,6 @@ public class TestServiceDiscovery extends BaseClassForTests
semaphore.release();
}
};
- closeables.add(discovery);
discovery.start();
timing.acquireSemaphore(semaphore);
@@ -131,35 +125,31 @@ public class TestServiceDiscovery extends BaseClassForTests
server.stop();
server.restart();
- closeables.add(server);
timing.acquireSemaphore(semaphore);
Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
}
finally
{
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
@Test
public void testCrashedInstance() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ CuratorFramework client = null;
+ ServiceDiscovery<String> discovery = null;
try
{
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
- closeables.add(discovery);
+ discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false);
discovery.start();
Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
@@ -171,11 +161,8 @@ public class TestServiceDiscovery extends BaseClassForTests
}
finally
{
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
@@ -185,11 +172,11 @@ public class TestServiceDiscovery extends BaseClassForTests
final String SERVICE_ONE = "one";
final String SERVICE_TWO = "two";
- List<Closeable> closeables = Lists.newArrayList();
+ CuratorFramework client = null;
+ ServiceDiscovery<Void> discovery = null;
try
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
@@ -197,8 +184,7 @@ public class TestServiceDiscovery extends BaseClassForTests
ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
- ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
- closeables.add(discovery);
+ discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
discovery.start();
discovery.registerService(s1_i1);
@@ -227,27 +213,23 @@ public class TestServiceDiscovery extends BaseClassForTests
}
finally
{
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
@Test
public void testBasic() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ CuratorFramework client = null;
+ ServiceDiscovery<String> discovery = null;
try
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
- closeables.add(discovery);
+ discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
discovery.start();
Assert.assertEquals(discovery.queryForNames(), Collections.singletonList("test"));
@@ -258,11 +240,8 @@ public class TestServiceDiscovery extends BaseClassForTests
}
finally
{
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
@@ -271,16 +250,16 @@ public class TestServiceDiscovery extends BaseClassForTests
{
Timing timing = new Timing();
server.stop();
- List<Closeable> closeables = Lists.newArrayList();
+
+ CuratorFramework client = null;
+ ServiceDiscovery<String> discovery = null;
try
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
- closeables.add(discovery);
+ discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
discovery.start();
server.restart();
@@ -293,11 +272,8 @@ public class TestServiceDiscovery extends BaseClassForTests
}
finally
{
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
@@ -308,7 +284,6 @@ public class TestServiceDiscovery extends BaseClassForTests
final String name = "name";
final CountDownLatch restartLatch = new CountDownLatch(1);
- List<Closeable> closeables = Lists.newArrayList();
InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class)
{
@@ -333,15 +308,15 @@ public class TestServiceDiscovery extends BaseClassForTests
}
};
+ CuratorFramework client = null;
+ ServiceDiscovery<String> discovery = null;
try
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
- closeables.add(discovery);
+ discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
discovery.start();
Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered.");
@@ -358,27 +333,23 @@ public class TestServiceDiscovery extends BaseClassForTests
}
finally
{
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
@Test
public void testCleaning() throws Exception
{
- List<Closeable> closeables = Lists.newArrayList();
+ CuratorFramework client = null;
+ ServiceDiscovery<String> discovery = null;
try
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- closeables.add(client);
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
- ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
- closeables.add(discovery);
+ discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
discovery.start();
discovery.unregisterService(instance);
@@ -386,11 +357,8 @@ public class TestServiceDiscovery extends BaseClassForTests
}
finally
{
- Collections.reverse(closeables);
- for ( Closeable c : closeables )
- {
- CloseableUtils.closeQuietly(c);
- }
+ CloseableUtils.closeQuietly(discovery);
+ CloseableUtils.closeQuietly(client);
}
}
}
[14/21] curator git commit: A few things for CURATOR-397
Posted by ra...@apache.org.
A few things for CURATOR-397
1. AsyncWrappers.asyncEnsureContainers was just wrong - this is a better implementation
2. Added raw serializer constant
3. Add ModeledOptions which can be expanded in the future. For now it just has ignoreMissingNodesForChildren
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11be719b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11be719b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11be719b
Branch: refs/heads/CURATOR-419
Commit: 11be719b32bcf8879c44a0c2005ba5a2107986cb
Parents: 4a0e022
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 08:13:03 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 08:16:01 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 38 +++++++-------------
.../x/async/modeled/ModelSerializer.java | 18 ++++++++++
.../async/modeled/ModeledFrameworkBuilder.java | 20 ++++++++++-
.../curator/x/async/modeled/ModeledOptions.java | 29 +++++++++++++++
.../modeled/details/ModeledFrameworkImpl.java | 33 +++++++++++++----
5 files changed, 104 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index e982cf2..7da82fc 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -20,7 +20,10 @@ package org.apache.curator.x.async;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.api.ExistsOption;
import org.apache.curator.x.async.modeled.ZPath;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -67,38 +70,21 @@ public class AsyncWrappers
{
/**
* Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
- * the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
- *
- * @param client client
- * @param path path to ensure
- * @return stage
- */
- public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
- {
- return asyncEnsureContainers(client, path, null);
- }
-
- /**
- * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
* the given executor
*
* @param client client
* @param path path to ensure
* @return stage
*/
- public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor)
+ public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
{
- Runnable proc = () -> {
- try
- {
- client.unwrap().createContainers(path.fullPath());
- }
- catch ( Exception e )
- {
- throw new RuntimeException(e);
- }
- };
- return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc);
+ Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
+ return client
+ .checkExists()
+ .withOptions(options)
+ .forPath(path.child("foo").fullPath())
+ .thenApply(__ -> null)
+ ;
}
/**
@@ -284,7 +270,7 @@ public class AsyncWrappers
future.complete(null);
}
}
- catch ( Exception e )
+ catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
future.completeExceptionally(e);
http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
index 428096e..476f314 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
@@ -40,4 +40,22 @@ public interface ModelSerializer<T>
* @throws RuntimeException if <code>bytes</code> is invalid or there was an error deserializing
*/
T deserialize(byte[] bytes);
+
+ /**
+ * A pass through serializer
+ */
+ ModelSerializer<byte[]> raw = new ModelSerializer<byte[]>()
+ {
+ @Override
+ public byte[] serialize(byte[] model)
+ {
+ return model;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] bytes)
+ {
+ return bytes;
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
index 2e8bec3..1df68e6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
@@ -18,13 +18,16 @@
*/
package org.apache.curator.x.async.modeled;
+import com.google.common.collect.ImmutableSet;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl;
import org.apache.zookeeper.WatchedEvent;
+import java.util.Collections;
import java.util.Objects;
+import java.util.Set;
import java.util.function.UnaryOperator;
public class ModeledFrameworkBuilder<T>
@@ -35,6 +38,7 @@ public class ModeledFrameworkBuilder<T>
private UnaryOperator<WatchedEvent> watcherFilter;
private UnhandledErrorListener unhandledErrorListener;
private UnaryOperator<CuratorEvent> resultFilter;
+ private Set<ModeledOptions> modeledOptions;
/**
* Build a new ModeledFramework instance
@@ -49,7 +53,8 @@ public class ModeledFrameworkBuilder<T>
watchMode,
watcherFilter,
unhandledErrorListener,
- resultFilter
+ resultFilter,
+ modeledOptions
);
}
@@ -142,6 +147,18 @@ public class ModeledFrameworkBuilder<T>
return this;
}
+ /**
+ * Change the modeled options
+ *
+ * @param modeledOptions new options set
+ * @return this for chaining
+ */
+ public ModeledFrameworkBuilder<T> withOptions(Set<ModeledOptions> modeledOptions)
+ {
+ this.modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "client cannot be null"));
+ return this;
+ }
+
ModeledFrameworkBuilder()
{
}
@@ -150,5 +167,6 @@ public class ModeledFrameworkBuilder<T>
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.modelSpec = Objects.requireNonNull(modelSpec, "modelSpec cannot be null");
+ modeledOptions = Collections.singleton(ModeledOptions.ignoreMissingNodesForChildren);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
new file mode 100644
index 0000000..434894b
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+public enum ModeledOptions
+{
+ /**
+ * Causes {@link ModeledFramework#children()} and {@link ModeledFramework#childrenAsZNodes()}
+ * to ignore {@link org.apache.zookeeper.KeeperException.NoNodeException} and merely return
+ * an empty list
+ */
+ ignoreMissingNodesForChildren
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index c1d19c4..44011ee 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -19,6 +19,8 @@
package org.apache.curator.x.async.modeled.details;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -36,13 +38,16 @@ import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
import org.apache.curator.x.async.modeled.ModelSpec;
import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ModeledOptions;
import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -62,13 +67,15 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
private final UnaryOperator<CuratorEvent> resultFilter;
private final AsyncCuratorFrameworkDsl dslClient;
private final boolean isWatched;
+ private final Set<ModeledOptions> modeledOptions;
- public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter)
+ public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions)
{
boolean isWatched = (watchMode != null);
Objects.requireNonNull(client, "client cannot be null");
Objects.requireNonNull(model, "model cannot be null");
+ modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null"));
watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess;
@@ -84,11 +91,12 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
- private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched)
+ private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched, Set<ModeledOptions> modeledOptions)
{
this.client = client;
this.dslClient = dslClient;
@@ -99,6 +107,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
this.unhandledErrorListener = unhandledErrorListener;
this.resultFilter = resultFilter;
this.isWatched = isWatched;
+ this.modeledOptions = modeledOptions;
}
@Override
@@ -280,7 +289,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
asyncStage.whenComplete((children, e) -> {
if ( e != null )
{
- modelStage.completeExceptionally(e);
+ if ( modeledOptions.contains(ModeledOptions.ignoreMissingNodesForChildren) && (Throwables.getRootCause(e) instanceof KeeperException.NoNodeException) )
+ {
+ modelStage.complete(Collections.emptyList());
+ }
+ else
+ {
+ modelStage.completeExceptionally(e);
+ }
}
else
{
@@ -303,7 +319,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
@@ -320,7 +337,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
@@ -337,7 +355,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
[08/21] curator git commit: Merge branch 'patch-1' of
github.com:nivs/curator into CURATOR-420
Posted by ra...@apache.org.
Merge branch 'patch-1' of github.com:nivs/curator into CURATOR-420
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/34be09a4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/34be09a4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/34be09a4
Branch: refs/heads/CURATOR-419
Commit: 34be09a44c7a941a95245a5bc31a11771e4f7e86
Parents: 3b9d606 b1ffac9
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 08:56:51 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 08:56:51 2017 -0500
----------------------------------------------------------------------
.../apache/curator/framework/recipes/shared/SharedCount.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/34be09a4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
[06/21] curator git commit: closes #200 - PR accepted and merged
Posted by ra...@apache.org.
closes #200 - PR accepted and merged
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/12cc7cec
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/12cc7cec
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/12cc7cec
Branch: refs/heads/CURATOR-419
Commit: 12cc7cec5dcb0cc512ec5900426fde74e3289614
Parents: 7442357
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:10:46 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:10:46 2017 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[12/21] curator git commit: license
Posted by ra...@apache.org.
license
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/716fb4aa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/716fb4aa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/716fb4aa
Branch: refs/heads/CURATOR-419
Commit: 716fb4aa44c27ef83f5c3f224197a9ff4c9e09b0
Parents: 1e06f40
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 12:52:33 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 12:52:33 2017 -0500
----------------------------------------------------------------------
.../curator/utils/ExceptionAccumulator.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/716fb4aa/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
index 2be2ee8..ee93ea9 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.utils;
import com.google.common.base.Throwables;
[19/21] curator git commit: Merge branch 'master' into CURATOR-419
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-419
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/666b1752
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/666b1752
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/666b1752
Branch: refs/heads/CURATOR-419
Commit: 666b175228d7e642d1b0bbf049119a628501900e
Parents: 7401345 c6f7aeb
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:46:26 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:46:26 2017 -0500
----------------------------------------------------------------------
.../org/apache/curator/framework/imps/GetDataBuilderImpl.java | 5 +++++
.../java/org/apache/curator/x/async/TestBasicOperations.java | 4 ++--
2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[17/21] curator git commit: wip for fixing tests
Posted by ra...@apache.org.
wip for fixing tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/74013456
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/74013456
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/74013456
Branch: refs/heads/CURATOR-419
Commit: 74013456da1260f9155a72fa651d5d6fa449d984
Parents: 123f2ec
Author: randgalt <ra...@apache.org>
Authored: Sun Jul 16 14:21:23 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jul 16 14:21:23 2017 -0500
----------------------------------------------------------------------
.../org/apache/curator/x/async/TestBasicOperations.java | 10 ++++++++++
.../curator/x/async/modeled/TestModeledFramework.java | 8 ++++----
2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/74013456/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 3e980ec..12d3014 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -189,4 +189,14 @@ public class TestBasicOperations extends CompletableBaseClassForTests
Assert.assertEquals(v.getCode(), KeeperException.Code.CONNECTIONLOSS);
});
}
+
+ @Test
+ public void testGetDataWithStat()
+ {
+ complete(client.create().forPath("/test"));
+
+ Stat stat = new Stat();
+ complete(client.getData().storingStatIn(stat).forPath("/test"));
+ Assert.assertEquals(stat.getVersion(), 1);
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/74013456/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index 42a9e63..53eb517 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -102,10 +102,10 @@ public class TestModeledFramework extends TestModeledFrameworkBase
@Test
public void testBadNode()
{
- complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()));
+ complete(async.create().forPath(modelSpec.path().fullPath(), "fubar".getBytes()), (v, e) -> {}); // ignore error
ModeledFramework<TestModel> client = ModeledFramework.builder(async, modelSpec).watched().build();
- complete(client.read().whenComplete((model, e) -> Assert.assertTrue(e instanceof RuntimeException)));
+ complete(client.read(), (model, e) -> Assert.assertTrue(e instanceof KeeperException.NoNodeException));
}
@Test
@@ -138,13 +138,13 @@ public class TestModeledFramework extends TestModeledFrameworkBase
public void testVersioned()
{
ModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec);
- client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101)));
+ complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101))));
VersionedModeledFramework<TestModel> versioned = client.versioned();
complete(versioned.read().whenComplete((v, e) -> {
Assert.assertNull(e);
Assert.assertTrue(v.version() > 0);
- }).thenCompose(versioned::set).whenComplete((s, e) -> Assert.assertNull(e))); // version is correct should succeed
+ }).thenCompose(versioned::set), (s, e) -> Assert.assertNull(e)); // version is correct should succeed
complete(versioned.read().whenComplete((v, e) -> {
Assert.assertNull(e);
[11/21] curator git commit: closes #225 - merged (with slight
modifications)
Posted by ra...@apache.org.
closes #225 - merged (with slight modifications)
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1e06f406
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1e06f406
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1e06f406
Branch: refs/heads/CURATOR-419
Commit: 1e06f406281aa893163c8639eebf1baa0cad673b
Parents: 4909f57
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 10:07:27 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 10:07:27 2017 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[05/21] curator git commit: fixed flakiness with TestSharedCount tests
Posted by ra...@apache.org.
fixed flakiness with TestSharedCount tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7442357b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7442357b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7442357b
Branch: refs/heads/CURATOR-419
Commit: 7442357b7e6dc76ff76c6bc6a3d361b65815f626
Parents: 5de6b81
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:00:16 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:00:16 2017 -0500
----------------------------------------------------------------------
.../framework/recipes/shared/TestSharedCount.java | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/7442357b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 3123c7d..6a0b7c2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -78,6 +78,7 @@ public class TestSharedCount extends BaseClassForTests
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
clients.add(client);
client.start();
+ client.checkExists().forPath("/"); // clear initial connect event
SharedCount count = new SharedCount(client, "/count", 10);
counts.add(count);
@@ -121,6 +122,7 @@ public class TestSharedCount extends BaseClassForTests
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
clients.add(client);
client.start();
+ client.checkExists().forPath("/"); // clear initial connect event
Assert.assertTrue(startLatch.await(10, TimeUnit.SECONDS));
@@ -438,13 +440,14 @@ public class TestSharedCount extends BaseClassForTests
@Test
public void testDisconnectReconnectWithMultipleClients() throws Exception
{
+ Timing timing = new Timing();
CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
curatorFramework1.start();
- curatorFramework1.blockUntilConnected();
+ curatorFramework1.checkExists().forPath("/"); // clear initial connect events
curatorFramework2.start();
- curatorFramework2.blockUntilConnected();
+ curatorFramework2.checkExists().forPath("/"); // clear initial connect events
final String sharedCountPath = "/count";
final int initialCount = 10;
@@ -485,7 +488,7 @@ public class TestSharedCount extends BaseClassForTests
try
{
sharedCount1.setCount(12);
- Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+ Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, timing.seconds(), TimeUnit.SECONDS), 1);
Assert.assertEquals(sharedCount1.getCount(), 12);
Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
@@ -498,10 +501,10 @@ public class TestSharedCount extends BaseClassForTests
server.restart();
- Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
// CURATOR-311 introduces to Curator's client reading server's shared count value
// when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
- Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
}
}
[09/21] curator git commit: close was double closing things. Also
introduce an ExceptionAccumulator
Posted by ra...@apache.org.
close was double closing things. Also introduce an ExceptionAccumulator
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e158bbc7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e158bbc7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e158bbc7
Branch: refs/heads/CURATOR-419
Commit: e158bbc75affef29828b98373e864b2c460c1f4d
Parents: 34be09a
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 09:57:54 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 09:57:54 2017 -0500
----------------------------------------------------------------------
.../curator/utils/ExceptionAccumulator.java | 51 ++++++++++++++++++++
.../discovery/details/ServiceDiscoveryImpl.java | 21 +++++---
2 files changed, 64 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/e158bbc7/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
new file mode 100644
index 0000000..2be2ee8
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/ExceptionAccumulator.java
@@ -0,0 +1,51 @@
+package org.apache.curator.utils;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Utility to accumulate multiple potential exceptions into one that
+ * is thrown at the end
+ */
+public class ExceptionAccumulator
+{
+ private volatile Throwable mainEx = null;
+
+ /**
+ * If there is an accumulated exception, throw it
+ */
+ public void propagate()
+ {
+ if ( mainEx != null )
+ {
+ Throwables.propagate(mainEx);
+ }
+ }
+
+ /**
+ * Add an exception into the accumulated exceptions. Note:
+ * if the exception is {@link java.lang.InterruptedException}
+ * then <code>Thread.currentThread().interrupt()</code> is called.
+ *
+ * @param e the exception
+ */
+ public void add(Throwable e)
+ {
+ if ( e instanceof InterruptedException )
+ {
+ if ( mainEx != null )
+ {
+ e.addSuppressed(mainEx);
+ }
+ Thread.currentThread().interrupt();
+ }
+
+ if ( mainEx == null )
+ {
+ mainEx = e;
+ }
+ else
+ {
+ mainEx.addSuppressed(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/e158bbc7/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 762c9a8..476705c 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ExceptionAccumulator;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.discovery.ServiceCache;
@@ -39,7 +40,6 @@ import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.ServiceProviderBuilder;
-import org.apache.curator.x.discovery.ServiceType;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -77,9 +77,12 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
log.debug("Re-registering due to reconnection");
reRegisterServices();
}
+ catch (InterruptedException ex)
+ {
+ Thread.currentThread().interrupt();
+ }
catch ( Exception e )
{
- ThreadUtils.checkInterrupted(e);
log.error("Could not re-register instances after reconnection", e);
}
}
@@ -140,10 +143,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public void close() throws IOException
{
- for ( ServiceCache<T> cache : Lists.newArrayList(caches) )
- {
- CloseableUtils.closeQuietly(cache);
- }
+ ExceptionAccumulator accumulator = new ExceptionAccumulator();
for ( ServiceProvider<T> provider : Lists.newArrayList(providers) )
{
CloseableUtils.closeQuietly(provider);
@@ -161,12 +161,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
catch ( Exception e )
{
- ThreadUtils.checkInterrupted(e);
+ accumulator.add(e);
log.error("Could not unregister instance: " + entry.service.getName(), e);
}
}
client.getConnectionStateListenable().removeListener(connectionStateListener);
+ accumulator.propagate();
}
/**
@@ -469,9 +470,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
nodeCache.start(true);
}
+ catch ( InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ return null;
+ }
catch ( Exception e )
{
- ThreadUtils.checkInterrupted(e);
log.error("Could not start node cache for: " + instance, e);
}
NodeCacheListener listener = new NodeCacheListener()
[02/21] curator git commit: SharedCount: fix removeListener
Posted by ra...@apache.org.
SharedCount: fix removeListener
`removeListener` only removed the listener from the SharedCount's `listeners` member, which had no effect.
This fix retrieves the `SharedValueListener` created for wrapping the `SharedCountListener` and removes it from the SharedValue's listeners.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b1ffac91
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b1ffac91
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b1ffac91
Branch: refs/heads/CURATOR-419
Commit: b1ffac9145750e03c01c4ce6823f4a8536054a1e
Parents: 8b28b12
Author: Niv Singer <ni...@innerlogics.com>
Authored: Wed Jul 5 11:47:13 2017 +0300
Committer: GitHub <no...@github.com>
Committed: Wed Jul 5 11:47:13 2017 +0300
----------------------------------------------------------------------
.../apache/curator/framework/recipes/shared/SharedCount.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/b1ffac91/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..50a9f0e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -141,7 +141,10 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
@Override
public void removeListener(SharedCountListener listener)
{
- listeners.remove(listener);
+ SharedValueListener valueListener = listeners.remove(listener);
+ if(valueListener != null) {
+ sharedValue.getListenable().removeListener(valueListener);
+ }
}
/**
[10/21] curator git commit: Add comment to close to make clear that
the provider should close its caches
Posted by ra...@apache.org.
Add comment to close to make clear that the provider should close its caches
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4909f57c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4909f57c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4909f57c
Branch: refs/heads/CURATOR-419
Commit: 4909f57c172c3f90f9aedeac07772d228a9c948c
Parents: e158bbc
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 11 09:59:36 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 11 09:59:36 2017 -0500
----------------------------------------------------------------------
.../java/org/apache/curator/x/discovery/ServiceProvider.java | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/4909f57c/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
----------------------------------------------------------------------
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
index d606649..f542ed3 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.discovery;
import org.apache.curator.x.discovery.details.InstanceProvider;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Collection;
/**
@@ -61,4 +62,10 @@ public interface ServiceProvider<T> extends Closeable
* @param instance instance that had an error
*/
public void noteError(ServiceInstance<T> instance);
+
+ /**
+ * Close the provider. Note: it's the provider's responsibility to close any caches it manages
+ */
+ @Override
+ void close() throws IOException;
}
[13/21] curator git commit: Fixed create mode for
AsyncCuratorFramework.transactionOp().create(). It was being ignored. Added a
test as well
Posted by ra...@apache.org.
Fixed create mode for AsyncCuratorFramework.transactionOp().create(). It was being ignored. Added a test as well
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4a0e022c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4a0e022c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4a0e022c
Branch: refs/heads/CURATOR-419
Commit: 4a0e022ce361e21b9f1751434166c5a90c4a8845
Parents: 716fb4a
Author: randgalt <ra...@apache.org>
Authored: Thu Jul 13 23:51:43 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jul 13 23:51:43 2017 -0500
----------------------------------------------------------------------
.../x/async/details/AsyncTransactionOpImpl.java | 4 ++--
.../x/async/CompletableBaseClassForTests.java | 8 ++++++-
.../curator/x/async/TestBasicOperations.java | 22 ++++++++++++++------
3 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/4a0e022c/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
index 7b158f8..2a2c293 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
@@ -18,7 +18,7 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.ACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.ACLPathAndBytesable;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.framework.api.VersionPathAndBytesable;
import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -115,7 +115,7 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp
private CuratorOp internalForPath(String path, byte[] data, boolean useData)
{
TransactionCreateBuilder2<CuratorOp> builder1 = (ttl > 0) ? client.transactionOp().create().withTtl(ttl) : client.transactionOp().create();
- ACLCreateModePathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed() : builder1;
+ ACLPathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed().withMode(createMode) : builder1.withMode(createMode);
PathAndBytesable<CuratorOp> builder3 = builder2.withACL(aclList);
try
{
http://git-wip-us.apache.org/repos/asf/curator/blob/4a0e022c/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
index 232d301..4a964b1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.x.async;
+import com.google.common.base.Throwables;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.testng.Assert;
@@ -33,7 +34,12 @@ public abstract class CompletableBaseClassForTests extends BaseClassForTests
protected <T, U> void complete(CompletionStage<T> stage)
{
- complete(stage, (v, e) -> {});
+ complete(stage, (v, e) -> {
+ if ( e != null )
+ {
+ Throwables.propagate(e);
+ }
+ });
}
protected <T, U> void complete(CompletionStage<T> stage, BiConsumer<? super T, Throwable> handler)
http://git-wip-us.apache.org/repos/asf/curator/blob/4a0e022c/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 0274413..78f37c2 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -20,10 +20,10 @@ package org.apache.curator.x.async;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.modeled.ZPath;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
@@ -32,17 +32,15 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
import static java.util.EnumSet.of;
import static org.apache.curator.x.async.api.CreateOption.compress;
import static org.apache.curator.x.async.api.CreateOption.setDataIfExists;
import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import static org.apache.zookeeper.CreateMode.PERSISTENT_SEQUENTIAL;
public class TestBasicOperations extends CompletableBaseClassForTests
{
@@ -69,6 +67,18 @@ public class TestBasicOperations extends CompletableBaseClassForTests
}
@Test
+ public void testCreateTransactionWithMode() throws Exception
+ {
+ complete(AsyncWrappers.asyncEnsureContainers(client, ZPath.parse("/test")));
+
+ CuratorOp op1 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");
+ CuratorOp op2 = client.transactionOp().create().withMode(PERSISTENT_SEQUENTIAL).forPath("/test/node-");
+ complete(client.transaction().forOperations(Arrays.asList(op1, op2)));
+
+ Assert.assertEquals(client.unwrap().getChildren().forPath("/test").size(), 2);
+ }
+
+ @Test
public void testCrud()
{
AsyncStage<String> createStage = client.create().forPath("/test", "one".getBytes());