You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/02/09 18:36:28 UTC
[29/47] curator git commit: Needed a method to re-stage watchers that
triggered only for connection problems.
Needed a method to re-stage watchers that triggered only for connection problems.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2fa1a69a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2fa1a69a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2fa1a69a
Branch: refs/heads/CURATOR-3.0
Commit: 2fa1a69afd9b1b9d4be6c756b643ca4d4ce1f810
Parents: b028098
Author: randgalt <ra...@apache.org>
Authored: Fri Jan 6 17:22:02 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jan 6 17:22:02 2017 -0500
----------------------------------------------------------------------
.../curator/x/async/AsyncCuratorFramework.java | 17 +++++-
.../curator/x/async/AsyncEventException.java | 46 +++++++++++++++
.../org/apache/curator/x/async/WatchMode.java | 42 ++++++++++++++
.../x/async/details/AsyncCreateBuilderImpl.java | 2 +-
.../details/AsyncCuratorFrameworkImpl.java | 32 +++++-----
.../x/async/details/AsyncDeleteBuilderImpl.java | 2 +-
.../x/async/details/AsyncExistsBuilderImpl.java | 9 +--
.../details/AsyncGetChildrenBuilderImpl.java | 9 +--
.../details/AsyncGetConfigBuilderImpl.java | 9 +--
.../async/details/AsyncGetDataBuilderImpl.java | 9 +--
.../async/details/AsyncReconfigBuilderImpl.java | 2 +-
.../details/AsyncRemoveWatchesBuilderImpl.java | 2 +-
.../x/async/details/AsyncSetACLBuilderImpl.java | 2 +-
.../async/details/AsyncSetDataBuilderImpl.java | 2 +-
.../curator/x/async/details/BuilderCommon.java | 10 +++-
.../x/async/details/InternalCallback.java | 2 +-
.../x/async/details/InternalWatcher.java | 61 ++++++++++++++------
.../curator/x/async/TestBasicOperations.java | 37 +++++++++++-
18 files changed, 238 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
index 6dc6f3e..9b29918 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
@@ -53,14 +53,29 @@ public interface AsyncCuratorFramework extends AsyncCuratorFrameworkDsl
CuratorFramework unwrap();
/**
+ * <p>
* Returns a facade that adds watching to any of the subsequently created builders. i.e. all
- * operations on the WatchedAsyncCuratorFramework facade will have watchers set.
+ * operations on the WatchableAsyncCuratorFramework facade will have watchers set. Also,
+ * the {@link org.apache.curator.x.async.AsyncStage} returned from these builders will
+ * have a loaded staged watcher that is accessed from {@link org.apache.curator.x.async.AsyncStage#event()}
+ * </p>
+ *
+ * <p>
+ * {@link WatchMode#stateChangeAndSuccess} is used
+ * </p>
*
* @return watcher facade
*/
WatchableAsyncCuratorFramework watched();
/**
+ * Same as {@link #watched()} but allows specifying the watch mode
+ *
+ * @return watcher facade
+ */
+ WatchableAsyncCuratorFramework watched(WatchMode mode);
+
+ /**
* Returns a facade that adds the given UnhandledErrorListener to all background operations
*
* @param listener lister to use
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java
new file mode 100644
index 0000000..f863215
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * The exception type set for async watchers
+ */
+public abstract class AsyncEventException extends Exception
+{
+ /**
+ * Returns the error condition that temporarily triggered the watcher. NOTE: the watcher
+ * will most likely still be set. Use {@link #reset()} to stage on the successful trigger
+ *
+ * @return state
+ */
+ public abstract Watcher.Event.KeeperState getKeeperState();
+
+ /**
+ * ZooKeeper temporarily triggers watchers when there is a connection event. However, the watcher
+ * stays set for the original operation. Use this method to reset with a new completion stage
+ * that will allow waiting for a successful trigger.
+ *
+ * @return new stage
+ */
+ public abstract CompletionStage<WatchedEvent> reset();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java
new file mode 100644
index 0000000..dbce8c1
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+public enum WatchMode
+{
+ /**
+ * The {@link java.util.concurrent.CompletionStage}<org.apache.zookeeper.WatchedEvent> will only
+ * complete on successful trigger. i.e. connection issues are ignored
+ */
+ successOnly,
+
+ /**
+ * The {@link java.util.concurrent.CompletionStage}<org.apache.zookeeper.WatchedEvent> will only
+ * completeExceptionally. Successful trigger is ignored. Connection exceptions are
+ * of type: {@link org.apache.curator.x.async.AsyncEventException}.
+ */
+ stateChangeOnly,
+
+ /**
+ * The {@link java.util.concurrent.CompletionStage}<org.apache.zookeeper.WatchedEvent> will
+ * complete for both successful trigger and connection exceptions. Connection exceptions are
+ * of type: {@link org.apache.curator.x.async.AsyncEventException}.
+ */
+ stateChangeAndSuccess
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index ce5b31e..7723775 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -128,7 +128,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
private AsyncStage<String> internalForPath(String path, byte[] data, boolean useData)
{
- BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, false, nameProc);
+ BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, nameProc);
CreateBuilderImpl builder = new CreateBuilderImpl(client,
createMode,
common.backgrounding,
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index d502079..a6101f2 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -38,11 +38,11 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
{
private final CuratorFrameworkImpl client;
private final UnhandledErrorListener unhandledErrorListener;
- private final boolean watched;
+ private final WatchMode watchMode;
public AsyncCuratorFrameworkImpl(CuratorFramework client)
{
- this(reveal(client), null, false);
+ this(reveal(client), null, null);
}
private static CuratorFrameworkImpl reveal(CuratorFramework client)
@@ -57,11 +57,11 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
}
- public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched)
+ public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
{
this.client = client;
this.unhandledErrorListener = unhandledErrorListener;
- this.watched = watched;
+ this.watchMode = watchMode;
}
@Override
@@ -99,7 +99,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public AsyncStage<List<ACL>> forPath(String path)
{
- BuilderCommon<List<ACL>> common = new BuilderCommon<>(unhandledErrorListener, false, aclProc);
+ BuilderCommon<List<ACL>> common = new BuilderCommon<>(unhandledErrorListener, aclProc);
GetACLBuilderImpl builder = new GetACLBuilderImpl(client, common.backgrounding, stat);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
@@ -122,7 +122,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
public AsyncMultiTransaction transaction()
{
return operations -> {
- BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(unhandledErrorListener, false, opResultsProc);
+ BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(unhandledErrorListener, opResultsProc);
CuratorMultiTransactionImpl builder = new CuratorMultiTransactionImpl(client, common.backgrounding);
return safeCall(common.internalCallback, () -> builder.forOperations(operations));
};
@@ -132,7 +132,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
public AsyncSyncBuilder sync()
{
return path -> {
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
SyncBuilderImpl builder = new SyncBuilderImpl(client, common.backgrounding);
return safeCall(common.internalCallback, () -> builder.forPath(path));
};
@@ -153,13 +153,19 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public WatchableAsyncCuratorFramework watched()
{
- return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, true);
+ return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, WatchMode.stateChangeAndSuccess);
+ }
+
+ @Override
+ public WatchableAsyncCuratorFramework watched(WatchMode mode)
+ {
+ return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, mode);
}
@Override
public AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener)
{
- return new AsyncCuratorFrameworkImpl(client, listener, watched);
+ return new AsyncCuratorFrameworkImpl(client, listener, watchMode);
}
@Override
@@ -171,24 +177,24 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public AsyncExistsBuilder checkExists()
{
- return new AsyncExistsBuilderImpl(client, unhandledErrorListener, watched);
+ return new AsyncExistsBuilderImpl(client, unhandledErrorListener, watchMode);
}
@Override
public AsyncGetDataBuilder getData()
{
- return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, watched);
+ return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, watchMode);
}
@Override
public AsyncGetChildrenBuilder getChildren()
{
- return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, watched);
+ return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, watchMode);
}
@Override
public AsyncGetConfigBuilder getConfig()
{
- return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watched);
+ return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watchMode);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
index 54073b0..243ea44 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
@@ -69,7 +69,7 @@ class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder
@Override
public AsyncStage<Void> forPath(String path)
{
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
DeleteBuilderImpl builder = new DeleteBuilderImpl(client, version, common.backgrounding, options.contains(DeleteOption.deletingChildrenIfNeeded), options.contains(DeleteOption.guaranteed), options.contains(DeleteOption.quietly));
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
index c77a0aa..d672047 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.ExistsBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncExistsBuilder;
import org.apache.curator.x.async.api.AsyncPathable;
import org.apache.curator.x.async.AsyncStage;
@@ -37,14 +38,14 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder
{
private final CuratorFrameworkImpl client;
private final UnhandledErrorListener unhandledErrorListener;
- private final boolean watched;
+ private final WatchMode watchMode;
private Set<ExistsOption> options = Collections.emptySet();
- AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched)
+ AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
{
this.client = client;
this.unhandledErrorListener = unhandledErrorListener;
- this.watched = watched;
+ this.watchMode = watchMode;
}
@Override
@@ -57,7 +58,7 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder
@Override
public AsyncStage<Stat> forPath(String path)
{
- BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, watched, safeStatProc);
+ BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc);
ExistsBuilderImpl builder = new ExistsBuilderImpl(client, common.backgrounding, common.watcher, options.contains(ExistsOption.createParentsIfNeeded), options.contains(ExistsOption.createParentsAsContainers));
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
index b429c58..7258c6c 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.GetChildrenBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncGetChildrenBuilder;
import org.apache.curator.x.async.api.AsyncPathable;
import org.apache.curator.x.async.AsyncStage;
@@ -34,20 +35,20 @@ class AsyncGetChildrenBuilderImpl implements AsyncGetChildrenBuilder
{
private final CuratorFrameworkImpl client;
private final UnhandledErrorListener unhandledErrorListener;
- private final boolean watched;
+ private final WatchMode watchMode;
private Stat stat = null;
- AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched)
+ AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
{
this.client = client;
this.unhandledErrorListener = unhandledErrorListener;
- this.watched = watched;
+ this.watchMode = watchMode;
}
@Override
public AsyncStage<List<String>> forPath(String path)
{
- BuilderCommon<List<String>> common = new BuilderCommon<>(unhandledErrorListener, watched, childrenProc);
+ BuilderCommon<List<String>> common = new BuilderCommon<>(unhandledErrorListener, watchMode, childrenProc);
GetChildrenBuilderImpl builder = new GetChildrenBuilderImpl(client, common.watcher, common.backgrounding, stat);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
index 7ecb18a..273fba2 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.GetConfigBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncEnsemblable;
import org.apache.curator.x.async.api.AsyncGetConfigBuilder;
import org.apache.curator.x.async.AsyncStage;
@@ -33,14 +34,14 @@ class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder
{
private final CuratorFrameworkImpl client;
private final UnhandledErrorListener unhandledErrorListener;
- private final boolean watched;
+ private final WatchMode watchMode;
private Stat stat = null;
- AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched)
+ AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
{
this.client = client;
this.unhandledErrorListener = unhandledErrorListener;
- this.watched = watched;
+ this.watchMode = watchMode;
}
@Override
@@ -53,7 +54,7 @@ class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder
@Override
public AsyncStage<byte[]> forEnsemble()
{
- BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watched, dataProc);
+ BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, common.backgrounding, common.watcher, stat);
return safeCall(common.internalCallback, builder::forEnsemble);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
index 7214cd8..ac9df4c 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.GetDataBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncGetDataBuilder;
import org.apache.curator.x.async.api.AsyncPathable;
import org.apache.curator.x.async.AsyncStage;
@@ -33,15 +34,15 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
{
private final CuratorFrameworkImpl client;
private final UnhandledErrorListener unhandledErrorListener;
- private final boolean watched;
+ private final WatchMode watchMode;
private boolean decompressed = false;
private Stat stat = null;
- AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched)
+ AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
{
this.client = client;
this.unhandledErrorListener = unhandledErrorListener;
- this.watched = watched;
+ this.watchMode = watchMode;
}
@Override
@@ -69,7 +70,7 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
@Override
public AsyncStage<byte[]> forPath(String path)
{
- BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watched, dataProc);
+ BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
GetDataBuilderImpl builder = new GetDataBuilderImpl(client, stat, common.watcher, common.backgrounding, decompressed);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
index f6a097e..32b9eb5 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
@@ -116,7 +116,7 @@ class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable
@Override
public AsyncStage<Void> forEnsemble()
{
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, common.backgrounding, stat, fromConfig, newMembers, joining, leaving);
return safeCall(common.internalCallback, () -> {
builder.forEnsemble();
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
index 7e9e091..98a8bbb 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
@@ -160,7 +160,7 @@ class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder, AsyncP
@Override
public AsyncStage<Void> forPath(String path)
{
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client,
watcher,
curatorWatcher,
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
index b5f5a06..8908de6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
@@ -62,7 +62,7 @@ class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, AsyncPathable<AsyncS
@Override
public AsyncStage<Stat> forPath(String path)
{
- BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, false, statProc);
+ BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc);
SetACLBuilderImpl builder = new SetACLBuilderImpl(client, common.backgrounding, aclList, version);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
index 3df52b9..cf2a56e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
@@ -78,7 +78,7 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder
private AsyncStage<Stat> internalForPath(String path, byte[] data, boolean useData)
{
- BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, false, statProc);
+ BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc);
SetDataBuilderImpl builder = new SetDataBuilderImpl(client, common.backgrounding, version, compressed);
return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
index 56cd462..043b5b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.details;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.Backgrounding;
+import org.apache.curator.x.async.WatchMode;
class BuilderCommon<T>
{
@@ -27,9 +28,14 @@ class BuilderCommon<T>
final Backgrounding backgrounding;
final InternalWatcher watcher;
- BuilderCommon(UnhandledErrorListener unhandledErrorListener, boolean watched, BackgroundProc<T> proc)
+ BuilderCommon(UnhandledErrorListener unhandledErrorListener, BackgroundProc<T> proc)
{
- watcher = watched ? new InternalWatcher() : null;
+ this(unhandledErrorListener, null, proc);
+ }
+
+ BuilderCommon(UnhandledErrorListener unhandledErrorListener, WatchMode watchMode, BackgroundProc<T> proc)
+ {
+ watcher = (watchMode != null) ? new InternalWatcher(watchMode) : null;
internalCallback = new InternalCallback<>(proc, watcher);
backgrounding = new Backgrounding(internalCallback, unhandledErrorListener);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
index a766380..505226f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
@@ -40,7 +40,7 @@ class InternalCallback<T> extends CompletableFuture<T> implements BackgroundCall
@Override
public CompletionStage<WatchedEvent> event()
{
- return watcher;
+ return (watcher != null) ? watcher.getFuture() : null;
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
index b631748..2c7de9e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
@@ -18,41 +18,68 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.zookeeper.KeeperException;
+import org.apache.curator.x.async.AsyncEventException;
+import org.apache.curator.x.async.WatchMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
-class InternalWatcher extends CompletableFuture<WatchedEvent> implements Watcher
+class InternalWatcher implements Watcher
{
+ private final WatchMode watchMode;
+ private volatile CompletableFuture<WatchedEvent> future = new CompletableFuture<>();
+
+ InternalWatcher(WatchMode watchMode)
+ {
+ this.watchMode = watchMode;
+ }
+
+ CompletableFuture<WatchedEvent> getFuture()
+ {
+ return future;
+ }
+
@Override
public void process(WatchedEvent event)
{
switch ( event.getState() )
{
- case ConnectedReadOnly:
- case SyncConnected:
- case SaslAuthenticated:
+ default:
{
- complete(event);
+ if ( (watchMode != WatchMode.stateChangeOnly) && (event.getType() != Event.EventType.None) )
+ {
+ if ( !future.complete(event) )
+ {
+ future.obtrudeValue(event);
+ }
+ }
break;
}
case Disconnected:
- {
- completeExceptionally(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
- break;
- }
-
case AuthFailed:
- {
- completeExceptionally(KeeperException.create(KeeperException.Code.AUTHFAILED));
- break;
- }
-
case Expired:
{
- completeExceptionally(KeeperException.create(KeeperException.Code.SESSIONEXPIRED));
+ if ( watchMode != WatchMode.successOnly )
+ {
+ AsyncEventException exception = new AsyncEventException()
+ {
+ @Override
+ public Event.KeeperState getKeeperState()
+ {
+ return event.getState();
+ }
+
+ @Override
+ public CompletionStage<WatchedEvent> reset()
+ {
+ future = new CompletableFuture<>();
+ return future;
+ }
+ };
+ future.completeExceptionally(exception);
+ }
break;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 2e0fb4d..d66db72 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -25,13 +25,17 @@ import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.IOException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import static java.util.EnumSet.of;
@@ -99,6 +103,33 @@ public class TestBasicOperations extends BaseClassForTests
Assert.assertTrue(timing.awaitLatch(latch));
}
+ @Test
+ public void testWatchingWithServerLoss() throws Exception
+ {
+ AsyncStage<Stat> stage = client.watched().checkExists().forPath("/test");
+ stage.thenRun(() -> {
+ try
+ {
+ server.stop();
+ }
+ catch ( IOException e )
+ {
+ // ignore
+ }
+ });
+
+ CountDownLatch latch = new CountDownLatch(1);
+ complete(stage.event(), (v, e) -> {
+ Assert.assertTrue(e instanceof AsyncEventException);
+ Assert.assertEquals(((AsyncEventException)e).getKeeperState(), Watcher.Event.KeeperState.Disconnected);
+ ((AsyncEventException)e).reset().thenRun(latch::countDown);
+ });
+
+ server.restart();
+ client.create().forPath("/test");
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+
private <T, U> void complete(CompletionStage<T> stage)
{
complete(stage, (v, e) -> {});
@@ -111,7 +142,7 @@ public class TestBasicOperations extends BaseClassForTests
stage.handle((v, e) -> {
handler.accept(v, e);
return null;
- }).toCompletableFuture().get();
+ }).toCompletableFuture().get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
@@ -125,5 +156,9 @@ public class TestBasicOperations extends BaseClassForTests
}
Assert.fail("get() failed", e);
}
+ catch ( TimeoutException e )
+ {
+ Assert.fail("get() timed out");
+ }
}
}