You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2017/05/02 20:06:13 UTC
[05/50] curator git commit: Added a filtering feature plus some
refactoring
Added a filtering feature plus some refactoring
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ee4031de
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ee4031de
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ee4031de
Branch: refs/heads/master
Commit: ee4031de78ab06494013e47911896e4e689ba131
Parents: 3876d3e
Author: randgalt <ra...@apache.org>
Authored: Sat Jan 7 15:25:54 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jan 7 15:25:54 2017 -0500
----------------------------------------------------------------------
.../curator/x/async/AsyncCuratorFramework.java | 29 +++++++++-
.../x/async/details/AsyncCreateBuilderImpl.java | 11 ++--
.../details/AsyncCuratorFrameworkImpl.java | 59 ++++++++++++--------
.../x/async/details/AsyncDeleteBuilderImpl.java | 11 ++--
.../x/async/details/AsyncExistsBuilderImpl.java | 11 ++--
.../details/AsyncGetChildrenBuilderImpl.java | 11 ++--
.../details/AsyncGetConfigBuilderImpl.java | 11 ++--
.../async/details/AsyncGetDataBuilderImpl.java | 11 ++--
.../async/details/AsyncReconfigBuilderImpl.java | 11 ++--
.../details/AsyncRemoveWatchesBuilderImpl.java | 11 ++--
.../x/async/details/AsyncSetACLBuilderImpl.java | 11 ++--
.../async/details/AsyncSetDataBuilderImpl.java | 11 ++--
.../curator/x/async/details/BuilderCommon.java | 13 ++---
.../apache/curator/x/async/details/Filters.java | 53 ++++++++++++++++++
.../x/async/details/InternalCallback.java | 6 +-
.../x/async/details/InternalWatcher.java | 16 ++++--
.../framework/imps/TestFrameworkBackground.java | 2 +-
17 files changed, 189 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
index 91784b0..183a5eb 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
@@ -19,10 +19,12 @@
package org.apache.curator.x.async;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.x.async.api.AsyncCuratorFrameworkDsl;
-import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
import org.apache.curator.x.async.details.AsyncCuratorFrameworkImpl;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.function.UnaryOperator;
/**
* Zookeeper framework-style client that returns composable async operations
@@ -58,5 +60,28 @@ public interface AsyncCuratorFramework extends AsyncCuratorFrameworkDsl
* @param listener lister to use
* @return facade
*/
- AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener);
+ AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener);
+
+ /**
+ * Returns a facade that adds the the given filters to all background operations and watchers.
+ * <code>resultFilter</code> will get called for every background callback. <code>watcherFilter</code>
+ * will get called for every watcher. The filters can return new versions or unchanged versions
+ * of the arguments.
+ *
+ * @param resultFilter filter to use or <code>null</code>
+ * @param watcherFilter filter to use or <code>null</code>
+ * @return facade
+ */
+ AsyncCuratorFrameworkDsl with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter);
+
+ /**
+ * Set any combination of listener or filters
+ *
+ * @param resultFilter filter to use or <code>null</code>
+ * @param watcherFilter filter to use or <code>null</code>
+ * @see #with(java.util.function.UnaryOperator, java.util.function.UnaryOperator)
+ * @see #with(org.apache.curator.framework.api.UnhandledErrorListener)
+ * @return facade
+ */
+ AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index b3c91b3..b2b9000 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CreateBuilderImpl;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.AsyncCreateBuilder;
import org.apache.curator.x.async.api.AsyncPathAndBytesable;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
@@ -39,16 +38,16 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
class AsyncCreateBuilderImpl implements AsyncCreateBuilder
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private CreateMode createMode = CreateMode.PERSISTENT;
private List<ACL> aclList = null;
private Set<CreateOption> options = Collections.emptySet();
private Stat stat = null;
- AsyncCreateBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ AsyncCreateBuilderImpl(CuratorFrameworkImpl client, Filters filters)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
}
@Override
@@ -128,7 +127,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
private AsyncStage<String> internalForPath(String path, byte[] data, boolean useData)
{
- BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, nameProc);
+ BuilderCommon<String> common = new BuilderCommon<>(filters, nameProc);
CreateBuilderImpl builder = new CreateBuilderImpl(client,
createMode,
common.backgrounding,
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index a6101f2..aa82644 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -19,6 +19,7 @@
package org.apache.curator.x.async.details;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
@@ -27,22 +28,24 @@ import org.apache.curator.framework.imps.GetACLBuilderImpl;
import org.apache.curator.framework.imps.SyncBuilderImpl;
import org.apache.curator.x.async.*;
import org.apache.curator.x.async.api.*;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.Objects;
+import java.util.function.UnaryOperator;
import static org.apache.curator.x.async.details.BackgroundProcs.*;
public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private final WatchMode watchMode;
public AsyncCuratorFrameworkImpl(CuratorFramework client)
{
- this(reveal(client), null, null);
+ this(reveal(client), new Filters(null, null, null), null);
}
private static CuratorFrameworkImpl reveal(CuratorFramework client)
@@ -57,29 +60,29 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
}
- public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+ public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode)
{
- this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.filters = Objects.requireNonNull(filters, "filters cannot be null");
this.watchMode = watchMode;
}
@Override
public AsyncCreateBuilder create()
{
- return new AsyncCreateBuilderImpl(client, unhandledErrorListener);
+ return new AsyncCreateBuilderImpl(client, filters);
}
@Override
public AsyncDeleteBuilder delete()
{
- return new AsyncDeleteBuilderImpl(client, unhandledErrorListener);
+ return new AsyncDeleteBuilderImpl(client, filters);
}
@Override
public AsyncSetDataBuilder setData()
{
- return new AsyncSetDataBuilderImpl(client, unhandledErrorListener);
+ return new AsyncSetDataBuilderImpl(client, filters);
}
@Override
@@ -99,7 +102,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public AsyncStage<List<ACL>> forPath(String path)
{
- BuilderCommon<List<ACL>> common = new BuilderCommon<>(unhandledErrorListener, aclProc);
+ BuilderCommon<List<ACL>> common = new BuilderCommon<>(filters, aclProc);
GetACLBuilderImpl builder = new GetACLBuilderImpl(client, common.backgrounding, stat);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
@@ -109,20 +112,20 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public AsyncSetACLBuilder setACL()
{
- return new AsyncSetACLBuilderImpl(client, unhandledErrorListener);
+ return new AsyncSetACLBuilderImpl(client, filters);
}
@Override
public AsyncReconfigBuilder reconfig()
{
- return new AsyncReconfigBuilderImpl(client, unhandledErrorListener);
+ return new AsyncReconfigBuilderImpl(client, filters);
}
@Override
public AsyncMultiTransaction transaction()
{
return operations -> {
- BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(unhandledErrorListener, opResultsProc);
+ BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(filters, opResultsProc);
CuratorMultiTransactionImpl builder = new CuratorMultiTransactionImpl(client, common.backgrounding);
return safeCall(common.internalCallback, () -> builder.forOperations(operations));
};
@@ -132,7 +135,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
public AsyncSyncBuilder sync()
{
return path -> {
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
SyncBuilderImpl builder = new SyncBuilderImpl(client, common.backgrounding);
return safeCall(common.internalCallback, () -> builder.forPath(path));
};
@@ -141,7 +144,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public AsyncRemoveWatchesBuilder removeWatches()
{
- return new AsyncRemoveWatchesBuilderImpl(client, unhandledErrorListener);
+ return new AsyncRemoveWatchesBuilderImpl(client, filters);
}
@Override
@@ -153,19 +156,31 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public WatchableAsyncCuratorFramework watched()
{
- return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, WatchMode.stateChangeAndSuccess);
+ return new AsyncCuratorFrameworkImpl(client, filters, WatchMode.stateChangeAndSuccess);
}
@Override
public WatchableAsyncCuratorFramework watched(WatchMode mode)
{
- return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, mode);
+ return new AsyncCuratorFrameworkImpl(client, filters, mode);
}
@Override
- public AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener)
+ public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener)
{
- return new AsyncCuratorFrameworkImpl(client, listener, watchMode);
+ return new AsyncCuratorFrameworkImpl(client, new Filters(listener, filters.getResultFilter(), filters.getWatcherFilter()), watchMode);
+ }
+
+ @Override
+ public AsyncCuratorFrameworkDsl with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter)
+ {
+ return new AsyncCuratorFrameworkImpl(client, new Filters(filters.getListener(), resultFilter, watcherFilter), watchMode);
+ }
+
+ @Override
+ public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter)
+ {
+ return new AsyncCuratorFrameworkImpl(client, new Filters(listener, resultFilter, watcherFilter), watchMode);
}
@Override
@@ -177,24 +192,24 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
@Override
public AsyncExistsBuilder checkExists()
{
- return new AsyncExistsBuilderImpl(client, unhandledErrorListener, watchMode);
+ return new AsyncExistsBuilderImpl(client, filters, watchMode);
}
@Override
public AsyncGetDataBuilder getData()
{
- return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, watchMode);
+ return new AsyncGetDataBuilderImpl(client, filters, watchMode);
}
@Override
public AsyncGetChildrenBuilder getChildren()
{
- return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, watchMode);
+ return new AsyncGetChildrenBuilderImpl(client, filters, watchMode);
}
@Override
public AsyncGetConfigBuilder getConfig()
{
- return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watchMode);
+ return new AsyncGetConfigBuilderImpl(client, filters, watchMode);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
index 243ea44..e9efb90 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.DeleteBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.AsyncDeleteBuilder;
import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.DeleteOption;
import java.util.Collections;
import java.util.Objects;
@@ -35,14 +34,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private Set<DeleteOption> options = Collections.emptySet();
private int version = -1;
- AsyncDeleteBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ AsyncDeleteBuilderImpl(CuratorFrameworkImpl client, Filters filters)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
}
@Override
@@ -69,7 +68,7 @@ class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder
@Override
public AsyncStage<Void> forPath(String path)
{
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
DeleteBuilderImpl builder = new DeleteBuilderImpl(client, version, common.backgrounding, options.contains(DeleteOption.deletingChildrenIfNeeded), options.contains(DeleteOption.guaranteed), options.contains(DeleteOption.quietly));
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
index d3bb8ed..7a3385b 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
@@ -18,13 +18,12 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.ExistsBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncExistsBuilder;
import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.ExistsOption;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
@@ -37,14 +36,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeStatProc;
class AsyncExistsBuilderImpl implements AsyncExistsBuilder
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private final WatchMode watchMode;
private Set<ExistsOption> options = Collections.emptySet();
- AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+ AsyncExistsBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
this.watchMode = watchMode;
}
@@ -58,7 +57,7 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder
@Override
public AsyncStage<Stat> forPath(String path)
{
- BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc);
+ BuilderCommon<Stat> common = new BuilderCommon<>(filters, watchMode, safeStatProc);
ExistsBuilderImpl builder = new ExistsBuilderImpl(client,
common.backgrounding,
common.watcher,
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
index 7258c6c..b98323f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
@@ -18,13 +18,12 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.GetChildrenBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncGetChildrenBuilder;
import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.zookeeper.data.Stat;
import java.util.List;
@@ -34,21 +33,21 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
class AsyncGetChildrenBuilderImpl implements AsyncGetChildrenBuilder
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private final WatchMode watchMode;
private Stat stat = null;
- AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+ AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
this.watchMode = watchMode;
}
@Override
public AsyncStage<List<String>> forPath(String path)
{
- BuilderCommon<List<String>> common = new BuilderCommon<>(unhandledErrorListener, watchMode, childrenProc);
+ BuilderCommon<List<String>> common = new BuilderCommon<>(filters, watchMode, childrenProc);
GetChildrenBuilderImpl builder = new GetChildrenBuilderImpl(client, common.watcher, common.backgrounding, stat);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
index 273fba2..62272a7 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
@@ -18,13 +18,12 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.GetConfigBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncEnsemblable;
import org.apache.curator.x.async.api.AsyncGetConfigBuilder;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.zookeeper.data.Stat;
import static org.apache.curator.x.async.details.BackgroundProcs.dataProc;
@@ -33,14 +32,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private final WatchMode watchMode;
private Stat stat = null;
- AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+ AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
this.watchMode = watchMode;
}
@@ -54,7 +53,7 @@ class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder
@Override
public AsyncStage<byte[]> forEnsemble()
{
- BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
+ BuilderCommon<byte[]> common = new BuilderCommon<>(filters, watchMode, dataProc);
GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, common.backgrounding, common.watcher, stat);
return safeCall(common.internalCallback, builder::forEnsemble);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
index ac9df4c..deca49a 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
@@ -18,13 +18,12 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.GetDataBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.api.AsyncGetDataBuilder;
import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.zookeeper.data.Stat;
import static org.apache.curator.x.async.details.BackgroundProcs.dataProc;
@@ -33,15 +32,15 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private final WatchMode watchMode;
private boolean decompressed = false;
private Stat stat = null;
- AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+ AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
this.watchMode = watchMode;
}
@@ -70,7 +69,7 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
@Override
public AsyncStage<byte[]> forPath(String path)
{
- BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
+ BuilderCommon<byte[]> common = new BuilderCommon<>(filters, watchMode, dataProc);
GetDataBuilderImpl builder = new GetDataBuilderImpl(client, stat, common.watcher, common.backgrounding, decompressed);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
index 32b9eb5..6114159 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.ReconfigBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.AsyncEnsemblable;
import org.apache.curator.x.async.api.AsyncReconfigBuilder;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.zookeeper.data.Stat;
import java.util.List;
@@ -33,17 +32,17 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable<AsyncStage<Void>>
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private Stat stat = null;
private long fromConfig = -1;
private List<String> newMembers = null;
private List<String> joining = null;
private List<String> leaving = null;
- AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, Filters filters)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
}
@Override
@@ -116,7 +115,7 @@ class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable
@Override
public AsyncStage<Void> forEnsemble()
{
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, common.backgrounding, stat, fromConfig, newMembers, joining, leaving);
return safeCall(common.internalCallback, () -> {
builder.forEnsemble();
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
index 98a8bbb..1f3ad79 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
@@ -19,12 +19,11 @@
package org.apache.curator.x.async.details;
import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.RemoveWatchesBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.AsyncPathable;
import org.apache.curator.x.async.api.AsyncRemoveWatchesBuilder;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.RemoveWatcherOption;
import org.apache.zookeeper.Watcher;
import java.util.Collections;
@@ -37,16 +36,16 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder, AsyncPathable<AsyncStage<Void>>
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private Watcher.WatcherType watcherType = Watcher.WatcherType.Any;
private Set<RemoveWatcherOption> options = Collections.emptySet();
private Watcher watcher = null;
private CuratorWatcher curatorWatcher = null;
- AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, Filters filters)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
}
@Override
@@ -160,7 +159,7 @@ class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder, AsyncP
@Override
public AsyncStage<Void> forPath(String path)
{
- BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc);
+ BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client,
watcher,
curatorWatcher,
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
index 8908de6..e639b9e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.SetACLBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.AsyncPathable;
import org.apache.curator.x.async.api.AsyncSetACLBuilder;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.List;
@@ -34,14 +33,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.statProc;
class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, AsyncPathable<AsyncStage<Stat>>
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private int version = -1;
private List<ACL> aclList = null;
- AsyncSetACLBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ AsyncSetACLBuilderImpl(CuratorFrameworkImpl client, Filters filters)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
}
@Override
@@ -62,7 +61,7 @@ class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, AsyncPathable<AsyncS
@Override
public AsyncStage<Stat> forPath(String path)
{
- BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc);
+ BuilderCommon<Stat> common = new BuilderCommon<>(filters, statProc);
SetACLBuilderImpl builder = new SetACLBuilderImpl(client, common.backgrounding, aclList, version);
return safeCall(common.internalCallback, () -> builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
index cf2a56e..750fd59 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.SetDataBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.api.AsyncPathAndBytesable;
import org.apache.curator.x.async.api.AsyncSetDataBuilder;
-import org.apache.curator.x.async.AsyncStage;
import org.apache.zookeeper.data.Stat;
import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
@@ -32,14 +31,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.statProc;
class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder
{
private final CuratorFrameworkImpl client;
- private final UnhandledErrorListener unhandledErrorListener;
+ private final Filters filters;
private boolean compressed = false;
private int version = -1;
- AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener)
+ AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters)
{
this.client = client;
- this.unhandledErrorListener = unhandledErrorListener;
+ this.filters = filters;
}
@Override
@@ -78,7 +77,7 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder
private AsyncStage<Stat> internalForPath(String path, byte[] data, boolean useData)
{
- BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc);
+ BuilderCommon<Stat> common = new BuilderCommon<>(filters, statProc);
SetDataBuilderImpl builder = new SetDataBuilderImpl(client, common.backgrounding, version, compressed);
return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
index 043b5b4..82cd244 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.x.async.details;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.Backgrounding;
import org.apache.curator.x.async.WatchMode;
@@ -28,15 +27,15 @@ class BuilderCommon<T>
final Backgrounding backgrounding;
final InternalWatcher watcher;
- BuilderCommon(UnhandledErrorListener unhandledErrorListener, BackgroundProc<T> proc)
+ BuilderCommon(Filters filters, BackgroundProc<T> proc)
{
- this(unhandledErrorListener, null, proc);
+ this(filters,null, proc);
}
- BuilderCommon(UnhandledErrorListener unhandledErrorListener, WatchMode watchMode, BackgroundProc<T> proc)
+ BuilderCommon(Filters filters, WatchMode watchMode, BackgroundProc<T> proc)
{
- watcher = (watchMode != null) ? new InternalWatcher(watchMode) : null;
- internalCallback = new InternalCallback<>(proc, watcher);
- backgrounding = new Backgrounding(internalCallback, unhandledErrorListener);
+ watcher = (watchMode != null) ? new InternalWatcher(watchMode, filters.getWatcherFilter()) : null;
+ internalCallback = new InternalCallback<>(proc, watcher, filters.getResultFilter());
+ backgrounding = new Backgrounding(internalCallback, filters.getListener());
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java
new file mode 100644
index 0000000..ab46590
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.function.UnaryOperator;
+
+public class Filters
+{
+ private final UnhandledErrorListener listener;
+ private final UnaryOperator<CuratorEvent> resultFilter;
+ private final UnaryOperator<WatchedEvent> watcherFilter;
+
+ public Filters(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter)
+ {
+ this.listener = listener;
+ this.resultFilter = resultFilter;
+ this.watcherFilter = watcherFilter;
+ }
+
+ public UnhandledErrorListener getListener()
+ {
+ return listener;
+ }
+
+ public UnaryOperator<CuratorEvent> getResultFilter()
+ {
+ return resultFilter;
+ }
+
+ public UnaryOperator<WatchedEvent> getWatcherFilter()
+ {
+ return watcherFilter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
index 505226f..d25c736 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
@@ -25,16 +25,19 @@ import org.apache.curator.x.async.AsyncStage;
import org.apache.zookeeper.WatchedEvent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.function.UnaryOperator;
class InternalCallback<T> extends CompletableFuture<T> implements BackgroundCallback, AsyncStage<T>
{
private final BackgroundProc<T> resultFunction;
private final InternalWatcher watcher;
+ private final UnaryOperator<CuratorEvent> resultFilter;
- InternalCallback(BackgroundProc<T> resultFunction, InternalWatcher watcher)
+ InternalCallback(BackgroundProc<T> resultFunction, InternalWatcher watcher, UnaryOperator<CuratorEvent> resultFilter)
{
this.resultFunction = resultFunction;
this.watcher = watcher;
+ this.resultFilter = resultFilter;
}
@Override
@@ -46,6 +49,7 @@ class InternalCallback<T> extends CompletableFuture<T> implements BackgroundCall
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
+ event = (resultFilter != null) ? resultFilter.apply(event) : event;
resultFunction.apply(event, this);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
index 2c7de9e..7578083 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
@@ -24,15 +24,18 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.function.UnaryOperator;
class InternalWatcher implements Watcher
{
private final WatchMode watchMode;
+ private final UnaryOperator<WatchedEvent> watcherFilter;
private volatile CompletableFuture<WatchedEvent> future = new CompletableFuture<>();
- InternalWatcher(WatchMode watchMode)
+ InternalWatcher(WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter)
{
this.watchMode = watchMode;
+ this.watcherFilter = watcherFilter;
}
CompletableFuture<WatchedEvent> getFuture()
@@ -43,15 +46,16 @@ class InternalWatcher implements Watcher
@Override
public void process(WatchedEvent event)
{
- switch ( event.getState() )
+ final WatchedEvent localEvent = (watcherFilter != null) ? watcherFilter.apply(event) : event;
+ switch ( localEvent.getState() )
{
default:
{
- if ( (watchMode != WatchMode.stateChangeOnly) && (event.getType() != Event.EventType.None) )
+ if ( (watchMode != WatchMode.stateChangeOnly) && (localEvent.getType() != Event.EventType.None) )
{
- if ( !future.complete(event) )
+ if ( !future.complete(localEvent) )
{
- future.obtrudeValue(event);
+ future.obtrudeValue(localEvent);
}
}
break;
@@ -68,7 +72,7 @@ class InternalWatcher implements Watcher
@Override
public Event.KeeperState getKeeperState()
{
- return event.getState();
+ return localEvent.getState();
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 52c3faa..c00febd 100644
--- a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -101,7 +101,7 @@ public class TestFrameworkBackground extends BaseClassForTests
errorLatch.countDown();
}
};
- async.withUnhandledErrorListener(listener).create().forPath("/foo");
+ async.with(listener).create().forPath("/foo");
Assert.assertTrue(new Timing().awaitLatch(errorLatch));
}
finally