You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2014/12/01 23:20:09 UTC

curator git commit: CURATOR-161 - Initial cut of remove watches functionality. This provides the ability to remove watches, but does not yet provide a framework for observers being notified when a watch has been removed.

Repository: curator
Updated Branches:
  refs/heads/CURATOR-161 [created] 9ff9ccd23


CURATOR-161 - Initial cut of remove watches functionality. This provides
the ability to remove watches, but does not yet provide a framework for
observers being notified when a watch has been removed.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9ff9ccd2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9ff9ccd2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9ff9ccd2

Branch: refs/heads/CURATOR-161
Commit: 9ff9ccd23c8d033b2e7d72b83a0183d05f5dd685
Parents: d4883a8
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Tue Dec 2 09:16:40 2014 +1100
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Tue Dec 2 09:18:40 2014 +1100

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     | 471 ++++++++++---------
 .../curator/framework/api/CuratorEventType.java | 165 +++----
 .../framework/api/RemoveWatchesBuilder.java     |  29 ++
 .../framework/api/RemoveWatchesLocal.java       |  18 +
 .../framework/api/RemoveWatchesType.java        |  19 +
 .../framework/imps/CuratorFrameworkImpl.java    |   8 +-
 .../imps/RemoveWatchesBuilderImpl.java          | 192 ++++++++
 .../framework/imps/TestRemoveWatches.java       | 218 +++++++++
 8 files changed, 806 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 9c23ddb..9d1039a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -1,233 +1,238 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework;
-
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.framework.api.*;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.zookeeper.Watcher;
-
-import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Zookeeper framework-style client
- */
-public interface CuratorFramework extends Closeable
-{
-    /**
-     * Start the client. Most mutator methods will not work until the client is started
-     */
-    public void start();
-
-    /**
-     * Stop the client
-     */
-    public void close();
-
-    /**
-     * Returns the state of this instance
-     *
-     * @return state
-     */
-    public CuratorFrameworkState getState();
-
-    /**
-     * Return true if the client is started, not closed, etc.
-     *
-     * @return true/false
-     * @deprecated use {@link #getState()} instead
-     */
-    public boolean isStarted();
-
-    /**
-     * Start a create builder
-     *
-     * @return builder object
-     */
-    public CreateBuilder create();
-
-    /**
-     * Start a delete builder
-     *
-     * @return builder object
-     */
-    public DeleteBuilder delete();
-
-    /**
-     * Start an exists builder
-     * <p>
-     * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called.  Thus, a null
-     * means that it does not exist and an actual Stat object means it does exist.
-     *
-     * @return builder object
-     */
-    public ExistsBuilder checkExists();
-
-    /**
-     * Start a get data builder
-     *
-     * @return builder object
-     */
-    public GetDataBuilder getData();
-
-    /**
-     * Start a set data builder
-     *
-     * @return builder object
-     */
-    public SetDataBuilder setData();
-
-    /**
-     * Start a get children builder
-     *
-     * @return builder object
-     */
-    public GetChildrenBuilder getChildren();
-
-    /**
-     * Start a get ACL builder
-     *
-     * @return builder object
-     */
-    public GetACLBuilder getACL();
-
-    /**
-     * Start a set ACL builder
-     *
-     * @return builder object
-     */
-    public SetACLBuilder setACL();
-
-    /**
-     * Start a transaction builder
-     *
-     * @return builder object
-     */
-    public CuratorTransaction inTransaction();
-
-    /**
-     * Perform a sync on the given path - syncs are always in the background
-     *
-     * @param path                    the path
-     * @param backgroundContextObject optional context
-     * @deprecated use {@link #sync()} instead
-     */
-    public void sync(String path, Object backgroundContextObject);
-
-    /**
-     * Start a sync builder. Note: sync is ALWAYS in the background even
-     * if you don't use one of the background() methods
-     *
-     * @return builder object
-     */
-    public SyncBuilder sync();
-
-    /**
-     * Returns the listenable interface for the Connect State
-     *
-     * @return listenable
-     */
-    public Listenable<ConnectionStateListener> getConnectionStateListenable();
-
-    /**
-     * Returns the listenable interface for events
-     *
-     * @return listenable
-     */
-    public Listenable<CuratorListener> getCuratorListenable();
-
-    /**
-     * Returns the listenable interface for unhandled errors
-     *
-     * @return listenable
-     */
-    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
-
-    /**
-     * Returns a facade of the current instance that does _not_ automatically
-     * pre-pend the namespace to all paths
-     *
-     * @return facade
-     * @deprecated use {@link #usingNamespace} passing <code>null</code>
-     */
-    public CuratorFramework nonNamespaceView();
-
-    /**
-     * Returns a facade of the current instance that uses the specified namespace
-     * or no namespace if <code>newNamespace</code> is <code>null</code>.
-     *
-     * @param newNamespace the new namespace or null for none
-     * @return facade
-     */
-    public CuratorFramework usingNamespace(String newNamespace);
-
-    /**
-     * Return the current namespace or "" if none
-     *
-     * @return namespace
-     */
-    public String getNamespace();
-
-    /**
-     * Return the managed zookeeper client
-     *
-     * @return client
-     */
-    public CuratorZookeeperClient getZookeeperClient();
-
-    /**
-     * Allocates an ensure path instance that is namespace aware
-     *
-     * @param path path to ensure
-     * @return new EnsurePath instance
-     */
-    public EnsurePath newNamespaceAwareEnsurePath(String path);
-
-    /**
-     * Curator can hold internal references to watchers that may inhibit garbage collection.
-     * Call this method on watchers you are no longer interested in.
-     *
-     * @param watcher the watcher
-     */
-    public void clearWatcherReferences(Watcher watcher);
-        
-    /**
-     * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
-     * @param maxWaitTime The maximum wait time. Specify a value &lt;= 0 to wait indefinitely
-     * @param units The time units for the maximum wait time.
-     * @return True if connection has been established, false otherwise.
-     * @throws InterruptedException If interrupted while waiting
-     */
-    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
-    
-    /**
-     * Block until a connection to ZooKeeper is available. This method will not return until a
-     * connection is available or it is interrupted, in which case an InterruptedException will
-     * be thrown
-     * @throws InterruptedException If interrupted while waiting
-     */
-    public void blockUntilConnected() throws InterruptedException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.Watcher;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Zookeeper framework-style client
+ */
+public interface CuratorFramework extends Closeable
+{
+    /**
+     * Start the client. Most mutator methods will not work until the client is started
+     */
+    public void start();
+
+    /**
+     * Stop the client
+     */
+    public void close();
+
+    /**
+     * Returns the state of this instance
+     *
+     * @return state
+     */
+    public CuratorFrameworkState getState();
+
+    /**
+     * Return true if the client is started, not closed, etc.
+     *
+     * @return true/false
+     * @deprecated use {@link #getState()} instead
+     */
+    public boolean isStarted();
+
+    /**
+     * Start a create builder
+     *
+     * @return builder object
+     */
+    public CreateBuilder create();
+
+    /**
+     * Start a delete builder
+     *
+     * @return builder object
+     */
+    public DeleteBuilder delete();
+
+    /**
+     * Start an exists builder
+     * <p>
+     * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called.  Thus, a null
+     * means that it does not exist and an actual Stat object means it does exist.
+     *
+     * @return builder object
+     */
+    public ExistsBuilder checkExists();
+
+    /**
+     * Start a get data builder
+     *
+     * @return builder object
+     */
+    public GetDataBuilder getData();
+
+    /**
+     * Start a set data builder
+     *
+     * @return builder object
+     */
+    public SetDataBuilder setData();
+
+    /**
+     * Start a get children builder
+     *
+     * @return builder object
+     */
+    public GetChildrenBuilder getChildren();
+
+    /**
+     * Start a get ACL builder
+     *
+     * @return builder object
+     */
+    public GetACLBuilder getACL();
+
+    /**
+     * Start a set ACL builder
+     *
+     * @return builder object
+     */
+    public SetACLBuilder setACL();
+
+    /**
+     * Start a transaction builder
+     *
+     * @return builder object
+     */
+    public CuratorTransaction inTransaction();
+
+    /**
+     * Perform a sync on the given path - syncs are always in the background
+     *
+     * @param path                    the path
+     * @param backgroundContextObject optional context
+     * @deprecated use {@link #sync()} instead
+     */
+    public void sync(String path, Object backgroundContextObject);
+
+    /**
+     * Start a sync builder. Note: sync is ALWAYS in the background even
+     * if you don't use one of the background() methods
+     *
+     * @return builder object
+     */
+    public SyncBuilder sync();
+
+    /**
+     * Start a remove watches builder.
+     * @return builder object
+     */
+    public RemoveWatchesBuilder removeWatches();    
+
+    /**
+     * Returns the listenable interface for the Connect State
+     *
+     * @return listenable
+     */
+    public Listenable<ConnectionStateListener> getConnectionStateListenable();
+
+    /**
+     * Returns the listenable interface for events
+     *
+     * @return listenable
+     */
+    public Listenable<CuratorListener> getCuratorListenable();
+
+    /**
+     * Returns the listenable interface for unhandled errors
+     *
+     * @return listenable
+     */
+    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
+
+    /**
+     * Returns a facade of the current instance that does _not_ automatically
+     * pre-pend the namespace to all paths
+     *
+     * @return facade
+     * @deprecated use {@link #usingNamespace} passing <code>null</code>
+     */
+    public CuratorFramework nonNamespaceView();
+
+    /**
+     * Returns a facade of the current instance that uses the specified namespace
+     * or no namespace if <code>newNamespace</code> is <code>null</code>.
+     *
+     * @param newNamespace the new namespace or null for none
+     * @return facade
+     */
+    public CuratorFramework usingNamespace(String newNamespace);
+
+    /**
+     * Return the current namespace or "" if none
+     *
+     * @return namespace
+     */
+    public String getNamespace();
+
+    /**
+     * Return the managed zookeeper client
+     *
+     * @return client
+     */
+    public CuratorZookeeperClient getZookeeperClient();
+
+    /**
+     * Allocates an ensure path instance that is namespace aware
+     *
+     * @param path path to ensure
+     * @return new EnsurePath instance
+     */
+    public EnsurePath newNamespaceAwareEnsurePath(String path);
+
+    /**
+     * Curator can hold internal references to watchers that may inhibit garbage collection.
+     * Call this method on watchers you are no longer interested in.
+     *
+     * @param watcher the watcher
+     */
+    public void clearWatcherReferences(Watcher watcher);
+        
+    /**
+     * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
+     * @param maxWaitTime The maximum wait time. Specify a value &lt;= 0 to wait indefinitely
+     * @param units The time units for the maximum wait time.
+     * @return True if connection has been established, false otherwise.
+     * @throws InterruptedException If interrupted while waiting
+     */
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
+    
+    /**
+     * Block until a connection to ZooKeeper is available. This method will not return until a
+     * connection is available or it is interrupted, in which case an InterruptedException will
+     * be thrown
+     * @throws InterruptedException If interrupted while waiting
+     */
+    public void blockUntilConnected() throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 684d11b..480d5ec 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -1,80 +1,85 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.api;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.Watcher;
-
-public enum CuratorEventType
-{
-    /**
-     * Corresponds to {@link CuratorFramework#create()}
-     */
-    CREATE,
-
-    /**
-     * Corresponds to {@link CuratorFramework#delete()}
-     */
-    DELETE,
-
-    /**
-     * Corresponds to {@link CuratorFramework#checkExists()}
-     */
-    EXISTS,
-
-    /**
-     * Corresponds to {@link CuratorFramework#getData()}
-     */
-    GET_DATA,
-
-    /**
-     * Corresponds to {@link CuratorFramework#setData()}
-     */
-    SET_DATA,
-
-    /**
-     * Corresponds to {@link CuratorFramework#getChildren()}
-     */
-    CHILDREN,
-
-    /**
-     * Corresponds to {@link CuratorFramework#sync(String, Object)}
-     */
-    SYNC,
-
-    /**
-     * Corresponds to {@link CuratorFramework#getACL()}
-     */
-    GET_ACL,
-
-    /**
-     * Corresponds to {@link CuratorFramework#setACL()}
-     */
-    SET_ACL,
-
-    /**
-     * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
-     */
-    WATCHED,
-
-    /**
-     * Event sent when client is being closed
-     */
-    CLOSING
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.Watcher;
+
+public enum CuratorEventType
+{
+    /**
+     * Corresponds to {@link CuratorFramework#create()}
+     */
+    CREATE,
+
+    /**
+     * Corresponds to {@link CuratorFramework#delete()}
+     */
+    DELETE,
+
+    /**
+     * Corresponds to {@link CuratorFramework#checkExists()}
+     */
+    EXISTS,
+
+    /**
+     * Corresponds to {@link CuratorFramework#getData()}
+     */
+    GET_DATA,
+
+    /**
+     * Corresponds to {@link CuratorFramework#setData()}
+     */
+    SET_DATA,
+
+    /**
+     * Corresponds to {@link CuratorFramework#getChildren()}
+     */
+    CHILDREN,
+
+    /**
+     * Corresponds to {@link CuratorFramework#sync(String, Object)}
+     */
+    SYNC,
+
+    /**
+     * Corresponds to {@link CuratorFramework#getACL()}
+     */
+    GET_ACL,
+
+    /**
+     * Corresponds to {@link CuratorFramework#setACL()}
+     */
+    SET_ACL,
+
+    /**
+     * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
+     */
+    WATCHED,
+    
+    /**
+     * Corresponds to {@link CuratorFramework#removeWatches()}
+     */
+    REMOVE_WATCHES,
+
+    /**
+     * Event sent when client is being closed
+     */
+    CLOSING
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
new file mode 100644
index 0000000..2ed3c05
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
@@ -0,0 +1,29 @@
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Builder to allow watches to be removed 
+ */
+public interface RemoveWatchesBuilder
+{
+    /**
+     * Specify the watcher to be removed
+     * @param watcher
+     * @return
+     */
+    public RemoveWatchesType watcher(Watcher watcher);
+    
+    /**
+     * Specify the watcher to be removed
+     * @param watcher
+     * @return
+     */
+    public RemoveWatchesType watcher(CuratorWatcher watcher);
+    
+    /**
+     * Specify that all watches should be removed
+     * @return
+     */
+    public RemoveWatchesType allWatches();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
new file mode 100644
index 0000000..d54638c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
@@ -0,0 +1,18 @@
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow the specification of whether it is acceptable to remove client side watch information
+ * in the case where ZK cannot be contacted. 
+ */
+public interface RemoveWatchesLocal extends BackgroundPathable<Void>
+{
+   
+    /**
+     * Specify if the client should just remove client side watches if a connection to ZK
+     * is not available.
+     * @param local
+     * @return
+     */
+    public BackgroundPathable<Void> local(boolean local);
+    
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
new file mode 100644
index 0000000..3c58b7b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
@@ -0,0 +1,19 @@
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher.WatcherType;
+
+/**
+ * Builder to allow the specification of whether it is acceptable to remove client side watch information
+ * in the case where ZK cannot be contacted. 
+ */
+public interface RemoveWatchesType
+{
+   
+    /**
+     * Specify the type of watcher to be removed.
+     * @param watcherType
+     * @return
+     */
+    public RemoveWatchesLocal ofType(WatcherType watcherType);
+    
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index cf38e21..b9614ee 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -453,6 +453,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         return new SyncBuilderImpl(this);
     }
+    
+    @Override
+    public RemoveWatchesBuilder removeWatches()
+    {
+        return new RemoveWatchesBuilderImpl(this);
+    }
 
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
@@ -471,7 +477,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         return namespace.newNamespaceAwareEnsurePath(path);
     }
-
+    
     ACLProvider getAclProvider()
     {
         return aclProvider;

http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
new file mode 100644
index 0000000..08f0791
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -0,0 +1,192 @@
+package org.apache.curator.framework.imps;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundPathable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.RemoveWatchesLocal;
+import org.apache.curator.framework.api.RemoveWatchesBuilder;
+import org.apache.curator.framework.api.RemoveWatchesType;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWatchesType, RemoveWatchesLocal, BackgroundOperation<String>
+{
+    private CuratorFrameworkImpl client;
+    private Watcher watcher;
+    private WatcherType watcherType;
+    private boolean local;    
+    private Backgrounding backgrounding;
+    
+    public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+        this.watcher = null;
+        this.watcherType = null;
+        this.local = false;
+        this.backgrounding = new Backgrounding();
+    }
+    
+    @Override
+    public RemoveWatchesType watcher(Watcher watcher)
+    {
+        this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+        return this;
+    }
+    
+    @Override
+    public RemoveWatchesType watcher(CuratorWatcher watcher)
+    {
+        this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+        return this;
+    }    
+
+    @Override
+    public RemoveWatchesType allWatches()
+    {
+        this.watcher = null;
+        return this;
+    }
+
+    @Override
+    public RemoveWatchesLocal ofType(WatcherType watcherType)
+    {
+        this.watcherType = watcherType;
+        
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, executor);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> inBackground()
+    {
+        backgrounding = new Backgrounding(true);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public BackgroundPathable<Void> local(boolean local)
+    {
+        this.local = local;
+        return this;
+    }
+    
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        final String adjustedPath = client.fixForNamespace(path);
+        
+        if(backgrounding.inBackground())
+        {
+            pathInBackground(adjustedPath);
+        }
+        else
+        {
+            pathInForeground(adjustedPath);
+        }        
+        
+        return null;
+    }    
+    
+    private void pathInBackground(String path)
+    {
+        OperationAndData.ErrorCallback<String>  errorCallback = null;        
+        client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
+    }
+    
+    private void pathInForeground(final String path) throws Exception
+    {
+        RetryLoop.callWithRetry(client.getZookeeperClient(), 
+                new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        ZooKeeper zkClient = client.getZooKeeper();
+                        if(watcher == null)
+                        {
+                            zkClient.removeAllWatches(path, watcherType, local);    
+                        }
+                        else
+                        {
+                            zkClient.removeWatches(path, watcher, watcherType, local);
+                        }
+                        
+                        return null;
+                    }
+                });
+    }
+    
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> operationAndData)
+            throws Exception
+    {
+        final TimeTrace   trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");
+        
+        AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()
+        {
+            @Override
+            public void processResult(int rc, String path, Object ctx)
+            {
+                trace.commit();
+                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null);
+                client.processBackgroundOperation(operationAndData, event);                
+            }
+        };
+        
+        ZooKeeper zkClient = client.getZooKeeper();
+        if(watcher == null)
+        {
+            zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());    
+        }
+        else
+        {
+            zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext());
+        }
+        
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/curator/blob/9ff9ccd2/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
new file mode 100644
index 0000000..d7e8886
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -0,0 +1,218 @@
+package org.apache.curator.framework.imps;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestRemoveWatches extends BaseClassForTests
+{
+    @Test
+    public void testRemoveCuratorWatch() throws Exception
+    {       
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        try
+        {
+            client.start();
+            
+            CuratorWatcher watcher = new CuratorWatcher()
+            {
+                
+                @Override
+                public void process(WatchedEvent event) throws Exception
+                {
+                    // TODO Auto-generated method stub
+                    
+                }
+            };
+            
+            String path = "/";
+            client.checkExists().usingWatcher(watcher).forPath(path);
+            
+            client.removeWatches().watcher(watcher).ofType(WatcherType.Any).forPath(path);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }    
+    
+    @Test
+    public void testRemoveWatch() throws Exception
+    {       
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        try
+        {
+            client.start();
+            
+            Watcher watcher = new Watcher()
+            {                
+                @Override
+                public void process(WatchedEvent event)
+                {
+                }
+            };
+            
+            String path = "/";
+            client.checkExists().usingWatcher(watcher).forPath(path);
+            
+            client.removeWatches().watcher(watcher).ofType(WatcherType.Any).forPath(path);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+    
+    @Test
+    public void testRemoveWatchInBackgroundWithCallback() throws Exception
+    {       
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        try
+        {
+            client.start();
+            
+            Watcher watcher = new Watcher()
+            {                
+                @Override
+                public void process(WatchedEvent event)
+                {
+                }
+            };
+
+            final CountDownLatch removedLatch = new CountDownLatch(1);
+            BackgroundCallback callback = new BackgroundCallback()
+            {
+                
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event)
+                        throws Exception
+                {
+                    removedLatch.countDown();
+                }
+            };
+            
+            String path = "/";
+            client.checkExists().usingWatcher(watcher).forPath(path);
+            
+            client.removeWatches().watcher(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path);
+            
+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+            
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+    
+    @Test
+    public void testRemoveWatchInBackgroundWithNoCallback() throws Exception
+    {       
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        try
+        {
+            client.start();
+            
+            Watcher watcher = new Watcher()
+            {                
+                @Override
+                public void process(WatchedEvent event)
+                {
+                }
+            };
+
+            final CountDownLatch removedLatch = new CountDownLatch(1);
+            client.getCuratorListenable().addListener(new CuratorListener()
+            {
+                
+                @Override
+                public void eventReceived(CuratorFramework client, CuratorEvent event)
+                        throws Exception
+                {
+                    removedLatch.countDown();
+                }
+            });
+            
+            String path = "/";
+            client.checkExists().usingWatcher(watcher).forPath(path);
+            
+            client.removeWatches().watcher(watcher).ofType(WatcherType.Any).inBackground().forPath(path);
+            
+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+            
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }        
+    
+    @Test
+    public void testRemoveAllWatches() throws Exception
+    {       
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        try
+        {
+            client.start();
+            
+            Watcher watcher1 = new Watcher()
+            {                
+                @Override
+                public void process(WatchedEvent event)
+                {
+                }
+            };
+            
+            Watcher watcher2 = new Watcher()
+            {                
+                @Override
+                public void process(WatchedEvent event)
+                {
+                }
+            };            
+            
+            String path = "/";
+            client.checkExists().usingWatcher(watcher1).forPath(path);
+            client.checkExists().usingWatcher(watcher2).forPath(path);
+            
+            client.removeWatches().allWatches().ofType(WatcherType.Any).forPath(path);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }    
+}