You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/19 01:18:31 UTC
[15/31] curator git commit: CURATOR-161 - Added support for
guaranteed removal of watches. This includes refactoring the
FailedDeleteManager code into a FailedOperationManager to allow subclassing.
CURATOR-161 - Added support for guaranteed removal of watches. This
includes refactoring the FailedDeleteManager code into a
FailedOperationManager to allow subclassing.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/22d034af
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/22d034af
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/22d034af
Branch: refs/heads/CURATOR-3.0
Commit: 22d034af90987940420649c5f320e8dc09910c8a
Parents: 04caf36
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Wed May 13 09:28:45 2015 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Wed May 13 09:28:45 2015 +1000
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 4 +
.../curator/framework/api/DeleteBuilder.java | 2 +-
.../curator/framework/api/Guaranteeable.java | 20 +--
.../framework/api/GuaranteeableDelete.java | 39 ++++++
.../framework/api/RemoveWatchesType.java | 2 +-
.../framework/imps/CuratorFrameworkImpl.java | 8 ++
.../framework/imps/DeleteBuilderImpl.java | 4 +-
.../framework/imps/FailedDeleteManager.java | 39 +-----
.../framework/imps/FailedOperationManager.java | 65 ++++++++++
.../imps/FailedRemoveWatchManager.java | 56 ++++++++
.../framework/imps/NamespaceWatcherMap.java | 8 ++
.../imps/RemoveWatchesBuilderImpl.java | 56 ++++++--
.../framework/imps/TestFailedDeleteManager.java | 9 +-
.../framework/imps/TestRemoveWatches.java | 129 +++++++++++++++++++
14 files changed, 377 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 4b30fd4..2bce552 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -216,7 +216,11 @@ public interface CuratorFramework extends Closeable
* Call this method on watchers you are no longer interested in.
*
* @param watcher the watcher
+ *
+ * @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references
+ * when they are no longer used.
*/
+ @Deprecated
public void clearWatcherReferences(Watcher watcher);
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
index 3a3faf7..893e825 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
@@ -18,6 +18,6 @@
*/
package org.apache.curator.framework.api;
-public interface DeleteBuilder extends Guaranteeable, ChildrenDeletable
+public interface DeleteBuilder extends GuaranteeableDelete, ChildrenDeletable
{
}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
index 481911b..b43d6b0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
@@ -18,23 +18,15 @@
*/
package org.apache.curator.framework.api;
-public interface Guaranteeable extends BackgroundVersionable
+public interface Guaranteeable<T>
{
/**
- * <p>
- * Solves this edge case: deleting a node can fail due to connection issues. Further,
- * if the node was ephemeral, the node will not get auto-deleted as the session is still valid.
- * This can wreak havoc with lock implementations.
- * </p>
- *
- * <p>
- * When <code>guaranteed</code> is set, Curator will record failed node deletions and
- * attempt to delete them in the background until successful. NOTE: you will still get an
- * exception when the deletion fails. But, you can be assured that as long as the
- * {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node.
- * </p>
+ * Solves edge cases where an operation may succeed on the server but connection failure occurs before a
+ * response can be successfully returned to the client.
+ *
+ * @see org.apache.curator.framework.api.GuaranteeableDelete
*
* @return this
*/
- public ChildrenDeletable guaranteed();
+ public T guaranteed();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java
new file mode 100644
index 0000000..d04e7ea
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDelete.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * <p>
+ * Solves this edge case: deleting a node can fail due to connection issues. Further,
+ * if the node was ephemeral, the node will not get auto-deleted as the session is still valid.
+ * This can wreak havoc with lock implementations.
+ * </p>
+ *
+ * <p>
+ * When <code>guaranteed</code> is set, Curator will record failed node deletions and
+ * attempt to delete them in the background until successful. NOTE: you will still get an
+ * exception when the deletion fails. But, you can be assured that as long as the
+ * {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node.
+ * </p>
+ *
+ * @return this
+ */
+public interface GuaranteeableDelete extends Guaranteeable<ChildrenDeletable>, BackgroundVersionable
+{
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
index 1123afd..3112eac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
@@ -6,7 +6,7 @@ import org.apache.zookeeper.Watcher.WatcherType;
* Builder to allow the specification of whether it is acceptable to remove client side watch information
* in the case where ZK cannot be contacted.
*/
-public interface RemoveWatchesType extends RemoveWatchesLocal
+public interface RemoveWatchesType extends RemoveWatchesLocal, Guaranteeable<BackgroundPathableQuietly<Void>>
{
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 5caff7d..b4a1d93 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -72,6 +72,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
+ private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
@@ -147,6 +148,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
failedDeleteManager = new FailedDeleteManager(this);
+ failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
}
@@ -190,6 +192,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
connectionStateManager = parent.connectionStateManager;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
+ failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
@@ -487,6 +490,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
return failedDeleteManager;
}
+
+ FailedRemoveWatchManager getFailedRemoveWatcherManager()
+ {
+ return failedRemoveWatcherManager;
+ }
RetryLoop newRetryLoop()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 5d8b846..51691dd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -203,7 +203,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
@Override
public void retriesExhausted(OperationAndData<String> operationAndData)
{
- client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+ client.getFailedDeleteManager().addFailedOperation(unfixedPath);
}
};
}
@@ -253,7 +253,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
//Only retry a guaranteed delete if it's a retryable error
if( RetryLoop.isRetryException(e) && guaranteed )
{
- client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+ client.getFailedDeleteManager().addFailedOperation(unfixedPath);
}
throw e;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
index deb7f40..934ae40 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
@@ -19,45 +19,18 @@
package org.apache.curator.framework.imps;
import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-class FailedDeleteManager
+class FailedDeleteManager extends FailedOperationManager<String>
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
-
- volatile FailedDeleteManagerListener debugListener = null;
-
- interface FailedDeleteManagerListener
- {
- public void pathAddedForDelete(String path);
- }
-
FailedDeleteManager(CuratorFramework client)
{
- this.client = client;
+ super(client);
}
- void addFailedDelete(String path)
+ @Override
+ protected void executeGuaranteedOperationInBackground(String path)
+ throws Exception
{
- if ( debugListener != null )
- {
- debugListener.pathAddedForDelete(path);
- }
-
-
- if ( client.getState() == CuratorFrameworkState.STARTED )
- {
- log.debug("Path being added to guaranteed delete set: " + path);
- try
- {
- client.delete().guaranteed().inBackground().forPath(path);
- }
- catch ( Exception e )
- {
- addFailedDelete(path);
- }
- }
+ client.delete().guaranteed().inBackground().forPath(path);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
new file mode 100644
index 0000000..a1efde2
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class FailedOperationManager<T>
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ protected final CuratorFramework client;
+
+ volatile FailedOperationManagerListener<T> debugListener = null;
+
+ interface FailedOperationManagerListener<T>
+ {
+ public void pathAddedForGuaranteedOperation(T detail);
+ }
+
+ FailedOperationManager(CuratorFramework client)
+ {
+ this.client = client;
+ }
+
+ void addFailedOperation(T details)
+ {
+ if ( debugListener != null )
+ {
+ debugListener.pathAddedForGuaranteedOperation(details);
+ }
+
+
+ if ( client.getState() == CuratorFrameworkState.STARTED )
+ {
+ log.debug("Details being added to guaranteed operation set: " + details);
+ try
+ {
+ executeGuaranteedOperationInBackground(details);
+ }
+ catch ( Exception e )
+ {
+ addFailedOperation(details);
+ }
+ }
+ }
+
+ protected abstract void executeGuaranteedOperationInBackground(T details) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
new file mode 100644
index 0000000..f954e2a
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.Watcher;
+
+class FailedRemoveWatchManager extends FailedOperationManager<FailedRemoveWatchManager.FailedRemoveWatchDetails>
+{
+ FailedRemoveWatchManager(CuratorFramework client)
+ {
+ super(client);
+ }
+
+ @Override
+ protected void executeGuaranteedOperationInBackground(FailedRemoveWatchDetails details)
+ throws Exception
+ {
+ if(details.watcher == null)
+ {
+ client.watches().removeAll().guaranteed().inBackground().forPath(details.path);
+ }
+ else
+ {
+ client.watches().remove(details.watcher).guaranteed().inBackground().forPath(details.path);
+ }
+ }
+
+ static class FailedRemoveWatchDetails
+ {
+ public final String path;
+ public final Watcher watcher;
+
+ public FailedRemoveWatchDetails(String path, Watcher watcher)
+ {
+ this.path = path;
+ this.watcher = watcher;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
index e5aecb2..f656ba1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
@@ -70,6 +70,14 @@ class NamespaceWatcherMap implements Closeable
{
return map.remove(key);
}
+
+ /**
+ * Remove all watchers for a given path
+ * @param path
+ */
+ void removeAllForPath(String path) {
+
+ }
@VisibleForTesting
boolean isEmpty()
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index c9868f4..27d05da 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -27,8 +27,9 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
private CuratorFrameworkImpl client;
private Watcher watcher;
private WatcherType watcherType;
+ private boolean guaranteed;
private boolean local;
- private boolean quietly;
+ private boolean quietly;
private Backgrounding backgrounding;
public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client)
@@ -36,6 +37,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
this.client = client;
this.watcher = null;
this.watcherType = WatcherType.Any;
+ this.guaranteed = false;
this.local = false;
this.quietly = false;
this.backgrounding = new Backgrounding();
@@ -44,14 +46,26 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
@Override
public RemoveWatchesType remove(Watcher watcher)
{
- this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+ if(watcher == null) {
+ this.watcher = null;
+ } else {
+ //Try and get the namespaced version of the watcher.
+ this.watcher = client.getNamespaceWatcherMap().get(watcher);
+
+ //If this is not present then default to the original watcher. This shouldn't happen in practice unless the user
+ //has added a watch directly to the ZK client rather than via the CuratorFramework.
+ if(this.watcher == null) {
+ this.watcher = watcher;
+ }
+ }
+
return this;
}
@Override
public RemoveWatchesType remove(CuratorWatcher watcher)
{
- this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+ this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().get(watcher);
return this;
}
@@ -111,6 +125,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
backgrounding = new Backgrounding(context);
return this;
}
+
+ @Override
+ public RemoveWatchesLocal guaranteed()
+ {
+ guaranteed = true;
+ return this;
+ }
@Override
public BackgroundPathableQuietly<Void> locally()
@@ -143,14 +164,23 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
return null;
}
- private void pathInBackground(String path)
+ private void pathInBackground(final String path)
{
- OperationAndData.ErrorCallback<String> errorCallback = null;
+ OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
+ {
+ @Override
+ public void retriesExhausted(OperationAndData<String> operationAndData)
+ {
+ client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+ }
+ };
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
}
private void pathInForeground(final String path) throws Exception
{
+ //For the local case we don't want to use the normal retry loop and we don't want to block until a connection is available.
+ //We just execute the removeWatch, and if it fails, ZK will just remove local watches.
if(local)
{
ZooKeeper zkClient = client.getZooKeeper();
@@ -184,11 +214,21 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
zkClient.removeWatches(path, watcher, watcherType, local);
}
}
- catch(KeeperException.NoWatcherException e)
+ catch(Exception e)
{
- //Swallow this exception if the quietly flag is set, otherwise rethrow.
- if(!quietly)
+ if( RetryLoop.isRetryException(e) && guaranteed )
+ {
+ //Setup the guaranteed handler
+ client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+ throw e;
+ }
+ else if(e instanceof KeeperException.NoWatcherException && quietly)
+ {
+ //Ignore
+ }
+ else
{
+ //Rethrow
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
index 6599745..943529f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.imps.FailedDeleteManager.FailedDeleteManagerListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -291,11 +290,11 @@ public class TestFailedDeleteManager extends BaseClassForTests
final AtomicBoolean pathAdded = new AtomicBoolean(false);
- ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+ ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
{
@Override
- public void pathAddedForDelete(String path)
+ public void pathAddedForGuaranteedOperation(String path)
{
pathAdded.set(true);
}
@@ -325,11 +324,11 @@ public class TestFailedDeleteManager extends BaseClassForTests
final AtomicBoolean pathAdded = new AtomicBoolean(false);
- ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+ ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
{
@Override
- public void pathAddedForDelete(String path)
+ public void pathAddedForGuaranteedOperation(String path)
{
pathAdded.set(true);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/22d034af/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 0912c70..518f13b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -11,6 +11,10 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.FailedRemoveWatchManager.FailedRemoveWatchDetails;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
@@ -438,6 +442,131 @@ public class TestRemoveWatches extends BaseClassForTests
}
}
+ @Test
+ public void testGuaranteedRemoveWatch() throws Exception {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ final CountDownLatch suspendedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState == ConnectionState.SUSPENDED)
+ {
+ suspendedLatch.countDown();
+ }
+ else if(newState == ConnectionState.RECONNECTED)
+ {
+ reconnectedLatch.countDown();
+ }
+ }
+ });
+
+ String path = "/";
+
+ CountDownLatch removeLatch = new CountDownLatch(1);
+
+ Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ server.stop();
+ timing.awaitLatch(suspendedLatch);
+
+ //Remove the watch while we're not connected
+ try
+ {
+ client.watches().remove(watcher).guaranteed().forPath(path);
+ Assert.fail();
+ }
+ catch(KeeperException.ConnectionLossException e)
+ {
+ //Expected
+ }
+
+ server.restart();
+
+ timing.awaitLatch(removeLatch);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testGuaranteedRemoveWatchInBackground() throws Exception {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(),
+ new ExponentialBackoffRetry(100, 3));
+ try
+ {
+ client.start();
+
+ final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ final CountDownLatch suspendedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState == ConnectionState.SUSPENDED)
+ {
+ suspendedLatch.countDown();
+ }
+ else if(newState == ConnectionState.RECONNECTED)
+ {
+ reconnectedLatch.countDown();
+ }
+ }
+ });
+
+ final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
+
+ ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
+ {
+
+ @Override
+ public void pathAddedForGuaranteedOperation(
+ FailedRemoveWatchDetails detail)
+ {
+ guaranteeAddedLatch.countDown();
+ }
+ };
+
+ String path = "/";
+
+ CountDownLatch removeLatch = new CountDownLatch(1);
+
+ Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ server.stop();
+ timing.awaitLatch(suspendedLatch);
+
+ //Remove the watch while we're not connected
+ client.watches().remove(watcher).guaranteed().inBackground().forPath(path);
+
+ timing.awaitLatch(guaranteeAddedLatch);
+
+ server.restart();
+
+ timing.awaitLatch(removeLatch);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
private static class CountDownWatcher implements Watcher {
private String path;
private EventType eventType;