You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/19 01:18:32 UTC
[16/31] curator git commit: CURATOR-161 - Modified the background
processing framework to allow operations to request that a live connection is
not necessary to execute (this is needed to run the remove watches with
'local' set to true. Cleaned up some u
CURATOR-161 - Modified the background processing framework to allow
operations to request that a live connection is not necessary to execute
(this is needed to run the remove watches with 'local' set to true.
Cleaned up some unit tests.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ba4da2c3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ba4da2c3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ba4da2c3
Branch: refs/heads/CURATOR-3.0
Commit: ba4da2c3c7048ea249f18e7b4c815db76f0b1ad0
Parents: 22d034a
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Thu May 14 09:19:09 2015 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Thu May 14 09:19:09 2015 +1000
----------------------------------------------------------------------
.../framework/imps/CuratorFrameworkImpl.java | 2 +-
.../framework/imps/OperationAndData.java | 16 ++++-
.../imps/RemoveWatchesBuilderImpl.java | 22 ++++--
.../framework/imps/TestRemoveWatches.java | 73 ++++++++------------
4 files changed, 58 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index b4a1d93..c82f984 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -821,7 +821,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
try
{
- if ( client.isConnected() )
+ if ( !operationAndData.isConnectionRequired() || client.isConnected() )
{
operationAndData.callPerformBackgroundOperation();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 38f59a0..b46cddb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -40,25 +40,37 @@ class OperationAndData<T> implements Delayed, RetrySleeper
private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
private final long ordinal = nextOrdinal.getAndIncrement();
private final Object context;
+ private final boolean connectionRequired;
interface ErrorCallback<T>
{
void retriesExhausted(OperationAndData<T> operationAndData);
}
-
- OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+
+ OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
{
this.operation = operation;
this.data = data;
this.callback = callback;
this.errorCallback = errorCallback;
this.context = context;
+ this.connectionRequired = connectionRequired;
+ }
+
+ OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+ {
+ this(operation, data, callback, errorCallback, context, true);
}
Object getContext()
{
return context;
}
+
+ boolean isConnectionRequired()
+ {
+ return connectionRequired;
+ }
void callPerformBackgroundOperation() throws Exception
{
http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index 27d05da..932706b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -166,15 +166,23 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
private void pathInBackground(final String path)
{
- OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
+ OperationAndData.ErrorCallback<String> errorCallback = null;
+
+ //Only need an error callback if we're in guaranteed mode
+ if(guaranteed)
{
- @Override
- public void retriesExhausted(OperationAndData<String> operationAndData)
+ errorCallback = new OperationAndData.ErrorCallback<String>()
{
- client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
- }
- };
- client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
+ @Override
+ public void retriesExhausted(OperationAndData<String> operationAndData)
+ {
+ client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+ }
+ };
+ }
+
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(),
+ errorCallback, backgrounding.getContext(), !local), null);
}
private void pathInForeground(final String path) throws Exception
http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 518f13b..fc15f0c 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -1,8 +1,9 @@
package org.apache.curator.framework.imps;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -24,13 +25,30 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestRemoveWatches extends BaseClassForTests
{
+ private boolean blockUntilDesiredConnectionState(CuratorFramework client, Timing timing, final ConnectionState desiredState)
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState == desiredState)
+ {
+ latch.countDown();
+ }
+ }
+ });
+
+ return timing.awaitLatch(latch);
+ }
+
@Test
public void testRemoveCuratorDefaultWatcher() throws Exception
{
@@ -330,7 +348,7 @@ public class TestRemoveWatches extends BaseClassForTests
//Stop the server so we can check if we can remove watches locally when offline
server.stop();
- timing.sleepABit();
+ blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
client.watches().removeAll().locally().forPath(path);
@@ -364,7 +382,7 @@ public class TestRemoveWatches extends BaseClassForTests
//Stop the server so we can check if we can remove watches locally when offline
server.stop();
- timing.sleepABit();
+ blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
client.watches().removeAll().locally().inBackground().forPath(path);
@@ -452,25 +470,7 @@ public class TestRemoveWatches extends BaseClassForTests
try
{
client.start();
-
- final CountDownLatch reconnectedLatch = new CountDownLatch(1);
- final CountDownLatch suspendedLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState == ConnectionState.SUSPENDED)
- {
- suspendedLatch.countDown();
- }
- else if(newState == ConnectionState.RECONNECTED)
- {
- reconnectedLatch.countDown();
- }
- }
- });
-
+
String path = "/";
CountDownLatch removeLatch = new CountDownLatch(1);
@@ -479,7 +479,8 @@ public class TestRemoveWatches extends BaseClassForTests
client.checkExists().usingWatcher(watcher).forPath(path);
server.stop();
- timing.awaitLatch(suspendedLatch);
+
+ blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
//Remove the watch while we're not connected
try
@@ -510,25 +511,7 @@ public class TestRemoveWatches extends BaseClassForTests
try
{
client.start();
-
- final CountDownLatch reconnectedLatch = new CountDownLatch(1);
- final CountDownLatch suspendedLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState == ConnectionState.SUSPENDED)
- {
- suspendedLatch.countDown();
- }
- else if(newState == ConnectionState.RECONNECTED)
- {
- reconnectedLatch.countDown();
- }
- }
- });
-
+
final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
@@ -550,7 +533,7 @@ public class TestRemoveWatches extends BaseClassForTests
client.checkExists().usingWatcher(watcher).forPath(path);
server.stop();
- timing.awaitLatch(suspendedLatch);
+ blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
//Remove the watch while we're not connected
client.watches().remove(watcher).guaranteed().inBackground().forPath(path);