You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2014/12/01 23:20:09 UTC
curator git commit: CURATOR-161 - Initial cut of remove watches
functionality. This provides the ability to remove watches,
but does not yet provide a framework for observers being notified when a watch
has been removed.
Repository: curator
Updated Branches:
refs/heads/CURATOR-161 [created] 9ff9ccd23
CURATOR-161 - Initial cut of remove watches functionality. This provides
the ability to remove watches, but does not yet provide a framework for
observers being notified when a watch has been removed.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9ff9ccd2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9ff9ccd2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9ff9ccd2
Branch: refs/heads/CURATOR-161
Commit: 9ff9ccd23c8d033b2e7d72b83a0183d05f5dd685
Parents: d4883a8
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Tue Dec 2 09:16:40 2014 +1100
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Tue Dec 2 09:18:40 2014 +1100
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 471 ++++++++++---------
.../curator/framework/api/CuratorEventType.java | 165 +++----
.../framework/api/RemoveWatchesBuilder.java | 29 ++
.../framework/api/RemoveWatchesLocal.java | 18 +
.../framework/api/RemoveWatchesType.java | 19 +
.../framework/imps/CuratorFrameworkImpl.java | 8 +-
.../imps/RemoveWatchesBuilderImpl.java | 192 ++++++++
.../framework/imps/TestRemoveWatches.java | 218 +++++++++
8 files changed, 806 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 9c23ddb..9d1039a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -1,233 +1,238 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework;
-
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.framework.api.*;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.zookeeper.Watcher;
-
-import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Zookeeper framework-style client
- */
-public interface CuratorFramework extends Closeable
-{
- /**
- * Start the client. Most mutator methods will not work until the client is started
- */
- public void start();
-
- /**
- * Stop the client
- */
- public void close();
-
- /**
- * Returns the state of this instance
- *
- * @return state
- */
- public CuratorFrameworkState getState();
-
- /**
- * Return true if the client is started, not closed, etc.
- *
- * @return true/false
- * @deprecated use {@link #getState()} instead
- */
- public boolean isStarted();
-
- /**
- * Start a create builder
- *
- * @return builder object
- */
- public CreateBuilder create();
-
- /**
- * Start a delete builder
- *
- * @return builder object
- */
- public DeleteBuilder delete();
-
- /**
- * Start an exists builder
- * <p>
- * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called. Thus, a null
- * means that it does not exist and an actual Stat object means it does exist.
- *
- * @return builder object
- */
- public ExistsBuilder checkExists();
-
- /**
- * Start a get data builder
- *
- * @return builder object
- */
- public GetDataBuilder getData();
-
- /**
- * Start a set data builder
- *
- * @return builder object
- */
- public SetDataBuilder setData();
-
- /**
- * Start a get children builder
- *
- * @return builder object
- */
- public GetChildrenBuilder getChildren();
-
- /**
- * Start a get ACL builder
- *
- * @return builder object
- */
- public GetACLBuilder getACL();
-
- /**
- * Start a set ACL builder
- *
- * @return builder object
- */
- public SetACLBuilder setACL();
-
- /**
- * Start a transaction builder
- *
- * @return builder object
- */
- public CuratorTransaction inTransaction();
-
- /**
- * Perform a sync on the given path - syncs are always in the background
- *
- * @param path the path
- * @param backgroundContextObject optional context
- * @deprecated use {@link #sync()} instead
- */
- public void sync(String path, Object backgroundContextObject);
-
- /**
- * Start a sync builder. Note: sync is ALWAYS in the background even
- * if you don't use one of the background() methods
- *
- * @return builder object
- */
- public SyncBuilder sync();
-
- /**
- * Returns the listenable interface for the Connect State
- *
- * @return listenable
- */
- public Listenable<ConnectionStateListener> getConnectionStateListenable();
-
- /**
- * Returns the listenable interface for events
- *
- * @return listenable
- */
- public Listenable<CuratorListener> getCuratorListenable();
-
- /**
- * Returns the listenable interface for unhandled errors
- *
- * @return listenable
- */
- public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
-
- /**
- * Returns a facade of the current instance that does _not_ automatically
- * pre-pend the namespace to all paths
- *
- * @return facade
- * @deprecated use {@link #usingNamespace} passing <code>null</code>
- */
- public CuratorFramework nonNamespaceView();
-
- /**
- * Returns a facade of the current instance that uses the specified namespace
- * or no namespace if <code>newNamespace</code> is <code>null</code>.
- *
- * @param newNamespace the new namespace or null for none
- * @return facade
- */
- public CuratorFramework usingNamespace(String newNamespace);
-
- /**
- * Return the current namespace or "" if none
- *
- * @return namespace
- */
- public String getNamespace();
-
- /**
- * Return the managed zookeeper client
- *
- * @return client
- */
- public CuratorZookeeperClient getZookeeperClient();
-
- /**
- * Allocates an ensure path instance that is namespace aware
- *
- * @param path path to ensure
- * @return new EnsurePath instance
- */
- public EnsurePath newNamespaceAwareEnsurePath(String path);
-
- /**
- * Curator can hold internal references to watchers that may inhibit garbage collection.
- * Call this method on watchers you are no longer interested in.
- *
- * @param watcher the watcher
- */
- public void clearWatcherReferences(Watcher watcher);
-
- /**
- * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
- * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
- * @param units The time units for the maximum wait time.
- * @return True if connection has been established, false otherwise.
- * @throws InterruptedException If interrupted while waiting
- */
- public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
-
- /**
- * Block until a connection to ZooKeeper is available. This method will not return until a
- * connection is available or it is interrupted, in which case an InterruptedException will
- * be thrown
- * @throws InterruptedException If interrupted while waiting
- */
- public void blockUntilConnected() throws InterruptedException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.Watcher;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Zookeeper framework-style client
+ */
+public interface CuratorFramework extends Closeable
+{
+ /**
+ * Start the client. Most mutator methods will not work until the client is started
+ */
+ public void start();
+
+ /**
+ * Stop the client
+ */
+ public void close();
+
+ /**
+ * Returns the state of this instance
+ *
+ * @return state
+ */
+ public CuratorFrameworkState getState();
+
+ /**
+ * Return true if the client is started, not closed, etc.
+ *
+ * @return true/false
+ * @deprecated use {@link #getState()} instead
+ */
+ public boolean isStarted();
+
+ /**
+ * Start a create builder
+ *
+ * @return builder object
+ */
+ public CreateBuilder create();
+
+ /**
+ * Start a delete builder
+ *
+ * @return builder object
+ */
+ public DeleteBuilder delete();
+
+ /**
+ * Start an exists builder
+ * <p>
+ * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called. Thus, a null
+ * means that it does not exist and an actual Stat object means it does exist.
+ *
+ * @return builder object
+ */
+ public ExistsBuilder checkExists();
+
+ /**
+ * Start a get data builder
+ *
+ * @return builder object
+ */
+ public GetDataBuilder getData();
+
+ /**
+ * Start a set data builder
+ *
+ * @return builder object
+ */
+ public SetDataBuilder setData();
+
+ /**
+ * Start a get children builder
+ *
+ * @return builder object
+ */
+ public GetChildrenBuilder getChildren();
+
+ /**
+ * Start a get ACL builder
+ *
+ * @return builder object
+ */
+ public GetACLBuilder getACL();
+
+ /**
+ * Start a set ACL builder
+ *
+ * @return builder object
+ */
+ public SetACLBuilder setACL();
+
+ /**
+ * Start a transaction builder
+ *
+ * @return builder object
+ */
+ public CuratorTransaction inTransaction();
+
+ /**
+ * Perform a sync on the given path - syncs are always in the background
+ *
+ * @param path the path
+ * @param backgroundContextObject optional context
+ * @deprecated use {@link #sync()} instead
+ */
+ public void sync(String path, Object backgroundContextObject);
+
+ /**
+ * Start a sync builder. Note: sync is ALWAYS in the background even
+ * if you don't use one of the background() methods
+ *
+ * @return builder object
+ */
+ public SyncBuilder sync();
+
+ /**
+ * Start a remove watches builder.
+ * @return builder object
+ */
+ public RemoveWatchesBuilder removeWatches();
+
+ /**
+ * Returns the listenable interface for the Connect State
+ *
+ * @return listenable
+ */
+ public Listenable<ConnectionStateListener> getConnectionStateListenable();
+
+ /**
+ * Returns the listenable interface for events
+ *
+ * @return listenable
+ */
+ public Listenable<CuratorListener> getCuratorListenable();
+
+ /**
+ * Returns the listenable interface for unhandled errors
+ *
+ * @return listenable
+ */
+ public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
+
+ /**
+ * Returns a facade of the current instance that does _not_ automatically
+ * pre-pend the namespace to all paths
+ *
+ * @return facade
+ * @deprecated use {@link #usingNamespace} passing <code>null</code>
+ */
+ public CuratorFramework nonNamespaceView();
+
+ /**
+ * Returns a facade of the current instance that uses the specified namespace
+ * or no namespace if <code>newNamespace</code> is <code>null</code>.
+ *
+ * @param newNamespace the new namespace or null for none
+ * @return facade
+ */
+ public CuratorFramework usingNamespace(String newNamespace);
+
+ /**
+ * Return the current namespace or "" if none
+ *
+ * @return namespace
+ */
+ public String getNamespace();
+
+ /**
+ * Return the managed zookeeper client
+ *
+ * @return client
+ */
+ public CuratorZookeeperClient getZookeeperClient();
+
+ /**
+ * Allocates an ensure path instance that is namespace aware
+ *
+ * @param path path to ensure
+ * @return new EnsurePath instance
+ */
+ public EnsurePath newNamespaceAwareEnsurePath(String path);
+
+ /**
+ * Curator can hold internal references to watchers that may inhibit garbage collection.
+ * Call this method on watchers you are no longer interested in.
+ *
+ * @param watcher the watcher
+ */
+ public void clearWatcherReferences(Watcher watcher);
+
+ /**
+ * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
+ * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
+ * @param units The time units for the maximum wait time.
+ * @return True if connection has been established, false otherwise.
+ * @throws InterruptedException If interrupted while waiting
+ */
+ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
+
+ /**
+ * Block until a connection to ZooKeeper is available. This method will not return until a
+ * connection is available or it is interrupted, in which case an InterruptedException will
+ * be thrown
+ * @throws InterruptedException If interrupted while waiting
+ */
+ public void blockUntilConnected() throws InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 684d11b..480d5ec 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -1,80 +1,85 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.api;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.Watcher;
-
-public enum CuratorEventType
-{
- /**
- * Corresponds to {@link CuratorFramework#create()}
- */
- CREATE,
-
- /**
- * Corresponds to {@link CuratorFramework#delete()}
- */
- DELETE,
-
- /**
- * Corresponds to {@link CuratorFramework#checkExists()}
- */
- EXISTS,
-
- /**
- * Corresponds to {@link CuratorFramework#getData()}
- */
- GET_DATA,
-
- /**
- * Corresponds to {@link CuratorFramework#setData()}
- */
- SET_DATA,
-
- /**
- * Corresponds to {@link CuratorFramework#getChildren()}
- */
- CHILDREN,
-
- /**
- * Corresponds to {@link CuratorFramework#sync(String, Object)}
- */
- SYNC,
-
- /**
- * Corresponds to {@link CuratorFramework#getACL()}
- */
- GET_ACL,
-
- /**
- * Corresponds to {@link CuratorFramework#setACL()}
- */
- SET_ACL,
-
- /**
- * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
- */
- WATCHED,
-
- /**
- * Event sent when client is being closed
- */
- CLOSING
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.Watcher;
+
+public enum CuratorEventType
+{
+ /**
+ * Corresponds to {@link CuratorFramework#create()}
+ */
+ CREATE,
+
+ /**
+ * Corresponds to {@link CuratorFramework#delete()}
+ */
+ DELETE,
+
+ /**
+ * Corresponds to {@link CuratorFramework#checkExists()}
+ */
+ EXISTS,
+
+ /**
+ * Corresponds to {@link CuratorFramework#getData()}
+ */
+ GET_DATA,
+
+ /**
+ * Corresponds to {@link CuratorFramework#setData()}
+ */
+ SET_DATA,
+
+ /**
+ * Corresponds to {@link CuratorFramework#getChildren()}
+ */
+ CHILDREN,
+
+ /**
+ * Corresponds to {@link CuratorFramework#sync(String, Object)}
+ */
+ SYNC,
+
+ /**
+ * Corresponds to {@link CuratorFramework#getACL()}
+ */
+ GET_ACL,
+
+ /**
+ * Corresponds to {@link CuratorFramework#setACL()}
+ */
+ SET_ACL,
+
+ /**
+ * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
+ */
+ WATCHED,
+
+ /**
+ * Corresponds to {@link CuratorFramework#removeWatches()}
+ */
+ REMOVE_WATCHES,
+
+ /**
+ * Event sent when client is being closed
+ */
+ CLOSING
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
new file mode 100644
index 0000000..2ed3c05
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
@@ -0,0 +1,29 @@
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Builder to allow watches to be removed
+ */
+public interface RemoveWatchesBuilder
+{
+ /**
+ * Specify the watcher to be removed
+ * @param watcher
+ * @return
+ */
+ public RemoveWatchesType watcher(Watcher watcher);
+
+ /**
+ * Specify the watcher to be removed
+ * @param watcher
+ * @return
+ */
+ public RemoveWatchesType watcher(CuratorWatcher watcher);
+
+ /**
+ * Specify that all watches should be removed
+ * @return
+ */
+ public RemoveWatchesType allWatches();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
new file mode 100644
index 0000000..d54638c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
@@ -0,0 +1,18 @@
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow the specification of whether it is acceptable to remove client side watch information
+ * in the case where ZK cannot be contacted.
+ */
+public interface RemoveWatchesLocal extends BackgroundPathable<Void>
+{
+
+ /**
+ * Specify if the client should just remove client side watches if a connection to ZK
+ * is not available.
+ * @param local
+ * @return
+ */
+ public BackgroundPathable<Void> local(boolean local);
+
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
new file mode 100644
index 0000000..3c58b7b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
@@ -0,0 +1,19 @@
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher.WatcherType;
+
+/**
+ * Builder to allow the specification of whether it is acceptable to remove client side watch information
+ * in the case where ZK cannot be contacted.
+ */
+public interface RemoveWatchesType
+{
+
+ /**
+ * Specify the type of watcher to be removed.
+ * @param watcherType
+ * @return
+ */
+ public RemoveWatchesLocal ofType(WatcherType watcherType);
+
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index cf38e21..b9614ee 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -453,6 +453,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
return new SyncBuilderImpl(this);
}
+
+ @Override
+ public RemoveWatchesBuilder removeWatches()
+ {
+ return new RemoveWatchesBuilderImpl(this);
+ }
protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
{
@@ -471,7 +477,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
return namespace.newNamespaceAwareEnsurePath(path);
}
-
+
ACLProvider getAclProvider()
{
return aclProvider;
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
new file mode 100644
index 0000000..08f0791
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -0,0 +1,192 @@
+package org.apache.curator.framework.imps;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundPathable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.RemoveWatchesLocal;
+import org.apache.curator.framework.api.RemoveWatchesBuilder;
+import org.apache.curator.framework.api.RemoveWatchesType;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWatchesType, RemoveWatchesLocal, BackgroundOperation<String>
+{
+ private CuratorFrameworkImpl client;
+ private Watcher watcher;
+ private WatcherType watcherType;
+ private boolean local;
+ private Backgrounding backgrounding;
+
+ public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ this.watcher = null;
+ this.watcherType = null;
+ this.local = false;
+ this.backgrounding = new Backgrounding();
+ }
+
+ @Override
+ public RemoveWatchesType watcher(Watcher watcher)
+ {
+ this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+ return this;
+ }
+
+ @Override
+ public RemoveWatchesType watcher(CuratorWatcher watcher)
+ {
+ this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+ return this;
+ }
+
+ @Override
+ public RemoveWatchesType allWatches()
+ {
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public RemoveWatchesLocal ofType(WatcherType watcherType)
+ {
+ this.watcherType = watcherType;
+
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback, Object context)
+ {
+ backgrounding = new Backgrounding(callback, context);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, context, executor);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback)
+ {
+ backgrounding = new Backgrounding(callback);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, executor);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground()
+ {
+ backgrounding = new Backgrounding(true);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(Object context)
+ {
+ backgrounding = new Backgrounding(context);
+ return this;
+ }
+
+ @Override
+ public BackgroundPathable<Void> local(boolean local)
+ {
+ this.local = local;
+ return this;
+ }
+
+ @Override
+ public Void forPath(String path) throws Exception
+ {
+ final String adjustedPath = client.fixForNamespace(path);
+
+ if(backgrounding.inBackground())
+ {
+ pathInBackground(adjustedPath);
+ }
+ else
+ {
+ pathInForeground(adjustedPath);
+ }
+
+ return null;
+ }
+
+ private void pathInBackground(String path)
+ {
+ OperationAndData.ErrorCallback<String> errorCallback = null;
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
+ }
+
+ private void pathInForeground(final String path) throws Exception
+ {
+ RetryLoop.callWithRetry(client.getZookeeperClient(),
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ ZooKeeper zkClient = client.getZooKeeper();
+ if(watcher == null)
+ {
+ zkClient.removeAllWatches(path, watcherType, local);
+ }
+ else
+ {
+ zkClient.removeWatches(path, watcher, watcherType, local);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void performBackgroundOperation(final OperationAndData<String> operationAndData)
+ throws Exception
+ {
+ final TimeTrace trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");
+
+ AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx)
+ {
+ trace.commit();
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null);
+ client.processBackgroundOperation(operationAndData, event);
+ }
+ };
+
+ ZooKeeper zkClient = client.getZooKeeper();
+ if(watcher == null)
+ {
+ zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());
+ }
+ else
+ {
+ zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext());
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
new file mode 100644
index 0000000..d7e8886
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -0,0 +1,218 @@
+package org.apache.curator.framework.imps;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestRemoveWatches extends BaseClassForTests
+{
+ @Test
+ public void testRemoveCuratorWatch() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ CuratorWatcher watcher = new CuratorWatcher()
+ {
+
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+ };
+
+ String path = "/";
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.removeWatches().watcher(watcher).ofType(WatcherType.Any).forPath(path);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveWatch() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ }
+ };
+
+ String path = "/";
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.removeWatches().watcher(watcher).ofType(WatcherType.Any).forPath(path);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveWatchInBackgroundWithCallback() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ }
+ };
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ removedLatch.countDown();
+ }
+ };
+
+ String path = "/";
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.removeWatches().watcher(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveWatchInBackgroundWithNoCallback() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ }
+ };
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+ client.getCuratorListenable().addListener(new CuratorListener()
+ {
+
+ @Override
+ public void eventReceived(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ removedLatch.countDown();
+ }
+ });
+
+ String path = "/";
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.removeWatches().watcher(watcher).ofType(WatcherType.Any).inBackground().forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveAllWatches() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ Watcher watcher1 = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ }
+ };
+
+ Watcher watcher2 = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ }
+ };
+
+ String path = "/";
+ client.checkExists().usingWatcher(watcher1).forPath(path);
+ client.checkExists().usingWatcher(watcher2).forPath(path);
+
+ client.removeWatches().allWatches().ofType(WatcherType.Any).forPath(path);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+}