You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/09/10 22:45:50 UTC
[1/2] git commit: Added test from user Michael Morello that exposes
major retry hole in background operations. The getZooKeeper() method is not
retried
Updated Branches:
refs/heads/CURATOR-52 aa174242c -> a561f9a6a
Added test from user Michael Morello that exposes major retry hole in background operations. The getZooKeeper() method is not retried
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/6d7f726b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/6d7f726b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/6d7f726b
Branch: refs/heads/CURATOR-52
Commit: 6d7f726bea4919562cf521d48a105f4ffc23c30b
Parents: aa17424
Author: jordan.zimmerman <jo...@riotgames.com>
Authored: Tue Sep 10 15:11:33 2013 -0500
Committer: jordan.zimmerman <jo...@riotgames.com>
Committed: Tue Sep 10 15:11:33 2013 -0500
----------------------------------------------------------------------
.../framework/imps/TestFrameworkEdges.java | 178 +++++++++++--------
1 file changed, 104 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6d7f726b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 35d67b3..25ddf04 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import com.google.common.collect.Queues;
@@ -52,16 +53,45 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TestFrameworkEdges extends BaseClassForTests
{
@Test
- public void testReconnectAfterLoss() throws Exception
+ public void connectionLossWithBackgroundTest() throws Exception
{
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryOneTime(1));
+ try
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ client.start();
+ client.getZookeeperClient().blockUntilConnectedOrTimedOut();
+ server.close();
+ client.getChildren().inBackground
+ (
+ new BackgroundCallback()
+ {
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ latch.countDown();
+ }
+ }
+ ).forPath("/");
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testReconnectAfterLoss() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
- final CountDownLatch lostLatch = new CountDownLatch(1);
- ConnectionStateListener listener = new ConnectionStateListener()
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -100,10 +130,10 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testGetAclNoStat() throws Exception
+ public void testGetAclNoStat() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
@@ -123,17 +153,17 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testMissedResponseOnBackgroundESCreate() throws Exception
+ public void testMissedResponseOnBackgroundESCreate() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
- CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
+ CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
createBuilder.failNextCreateForTesting = true;
- final BlockingQueue<String> queue = Queues.newArrayBlockingQueue(1);
- BackgroundCallback callback = new BackgroundCallback()
+ final BlockingQueue<String> queue = Queues.newArrayBlockingQueue(1);
+ BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -142,7 +172,7 @@ public class TestFrameworkEdges extends BaseClassForTests
}
};
createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath("/");
- String ourPath = queue.poll(10, TimeUnit.SECONDS);
+ String ourPath = queue.poll(10, TimeUnit.SECONDS);
Assert.assertTrue(ourPath.startsWith(ZKPaths.makePath("/", CreateBuilderImpl.PROTECTED_PREFIX)));
Assert.assertFalse(createBuilder.failNextCreateForTesting);
}
@@ -153,15 +183,15 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testMissedResponseOnESCreate() throws Exception
+ public void testMissedResponseOnESCreate() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
- CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
+ CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
createBuilder.failNextCreateForTesting = true;
- String ourPath = createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/");
+ String ourPath = createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/");
Assert.assertTrue(ourPath.startsWith(ZKPaths.makePath("/", CreateBuilderImpl.PROTECTED_PREFIX)));
Assert.assertFalse(createBuilder.failNextCreateForTesting);
}
@@ -172,7 +202,7 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testSessionKilled() throws Exception
+ public void testSessionKilled() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
@@ -180,8 +210,8 @@ public class TestFrameworkEdges extends BaseClassForTests
{
client.create().forPath("/sessionTest");
- final AtomicBoolean sessionDied = new AtomicBoolean(false);
- Watcher watcher = new Watcher()
+ final AtomicBoolean sessionDied = new AtomicBoolean(false);
+ Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
@@ -204,35 +234,35 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testNestedCalls() throws Exception
+ public void testNestedCalls() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
client.getCuratorListenable().addListener
- (
- new CuratorListener()
- {
- @Override
- public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
+ (
+ new CuratorListener()
{
- if ( event.getType() == CuratorEventType.EXISTS )
+ @Override
+ public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
{
- Stat stat = client.checkExists().forPath("/yo/yo/yo");
- Assert.assertNull(stat);
+ if ( event.getType() == CuratorEventType.EXISTS )
+ {
+ Stat stat = client.checkExists().forPath("/yo/yo/yo");
+ Assert.assertNull(stat);
- client.create().inBackground(event.getContext()).forPath("/what");
- }
- else if ( event.getType() == CuratorEventType.CREATE )
- {
- ((CountDownLatch)event.getContext()).countDown();
+ client.create().inBackground(event.getContext()).forPath("/what");
+ }
+ else if ( event.getType() == CuratorEventType.CREATE )
+ {
+ ((CountDownLatch)event.getContext()).countDown();
+ }
}
}
- }
- );
+ );
- CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
client.checkExists().inBackground(latch).forPath("/hey");
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@@ -243,28 +273,28 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testBackgroundFailure() throws Exception
+ public void testBackgroundFailure() throws Exception
{
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener
- (
- new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
+ (
+ new ConnectionStateListener()
{
- if ( newState == ConnectionState.LOST )
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- latch.countDown();
+ if ( newState == ConnectionState.LOST )
+ {
+ latch.countDown();
+ }
}
}
- }
- );
+ );
client.checkExists().forPath("/hey");
client.checkExists().inBackground().forPath("/hey");
@@ -281,7 +311,7 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testFailure() throws Exception
+ public void testFailure() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 100, 100, new RetryOneTime(1));
client.start();
@@ -306,40 +336,40 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testRetry() throws Exception
+ public void testRetry() throws Exception
{
- final int MAX_RETRIES = 3;
- final int serverPort = server.getPort();
+ final int MAX_RETRIES = 3;
+ final int serverPort = server.getPort();
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(10));
client.start();
try
{
- final AtomicInteger retries = new AtomicInteger(0);
- final Semaphore semaphore = new Semaphore(0);
+ final AtomicInteger retries = new AtomicInteger(0);
+ final Semaphore semaphore = new Semaphore(0);
client.getZookeeperClient().setRetryPolicy
- (
- new RetryPolicy()
- {
- @Override
- public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
+ (
+ new RetryPolicy()
{
- semaphore.release();
- if ( retries.incrementAndGet() == MAX_RETRIES )
+ @Override
+ public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
- try
+ semaphore.release();
+ if ( retries.incrementAndGet() == MAX_RETRIES )
{
- server = new TestingServer(serverPort);
- }
- catch ( Exception e )
- {
- throw new Error(e);
+ try
+ {
+ server = new TestingServer(serverPort);
+ }
+ catch ( Exception e )
+ {
+ throw new Error(e);
+ }
}
+ return true;
}
- return true;
}
- }
- );
+ );
server.stop();
@@ -367,7 +397,7 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testNotStarted() throws Exception
+ public void testNotStarted() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
@@ -386,7 +416,7 @@ public class TestFrameworkEdges extends BaseClassForTests
}
@Test
- public void testStopped() throws Exception
+ public void testStopped() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
[2/2] git commit: Fix edge case reported as CURATOR-52.
ConnectionState.checkTimeouts() throws
KeeperException.ConnectionLossException when the initial (or previously
failed) connection cannot be re-established. This needs to be run through the
retry pol
Posted by ra...@apache.org.
Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
and callbacks need to get invoked, etc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/a561f9a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/a561f9a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/a561f9a6
Branch: refs/heads/CURATOR-52
Commit: a561f9a6a17c6fba82551b56ffadcc1b68df46cc
Parents: 6d7f726
Author: jordan.zimmerman <jo...@riotgames.com>
Authored: Tue Sep 10 15:44:20 2013 -0500
Committer: jordan.zimmerman <jo...@riotgames.com>
Committed: Tue Sep 10 15:44:20 2013 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 2 +-
.../curator/CuratorConnectionLossException.java | 11 ++
.../framework/imps/CreateBuilderImpl.java | 4 +-
.../framework/imps/CuratorFrameworkImpl.java | 106 ++++++++++++-------
.../framework/imps/DeleteBuilderImpl.java | 2 +-
.../framework/imps/ExistsBuilderImpl.java | 2 +-
.../framework/imps/GetACLBuilderImpl.java | 2 +-
.../framework/imps/GetChildrenBuilderImpl.java | 2 +-
.../framework/imps/GetDataBuilderImpl.java | 2 +-
.../framework/imps/OperationAndData.java | 32 +++---
.../framework/imps/SetACLBuilderImpl.java | 2 +-
.../framework/imps/SetDataBuilderImpl.java | 2 +-
.../curator/framework/imps/SyncBuilderImpl.java | 2 +-
13 files changed, 109 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index bbb0588..e02ee88 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -188,7 +188,7 @@ class ConnectionState implements Watcher, Closeable
}
else
{
- KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException();
+ KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException();
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java b/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java
new file mode 100644
index 0000000..23eeb49
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java
@@ -0,0 +1,11 @@
+package org.apache.curator;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is needed to differentiate between ConnectionLossException thrown by ZooKeeper
+ * and ConnectionLossException thrown by {@link ConnectionState#checkTimeouts()}
+ */
+public class CuratorConnectionLossException extends KeeperException.ConnectionLossException
+{
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 6d7b261..ee99074 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -486,7 +486,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
client.queueOperation(mainOperationAndData);
}
};
- OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null);
+ OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
client.queueOperation(parentOperation);
}
@@ -558,7 +558,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
private void pathInBackground(final String path, final byte[] data, final String givenPath)
{
final AtomicBoolean firstTime = new AtomicBoolean(true);
- OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null)
+ OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext())
{
@Override
void callPerformBackgroundOperation() throws Exception
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 146103c..3cccf45 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.imps;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
import org.apache.curator.TimeTrace;
@@ -420,7 +421,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
{
BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
- performBackgroundOperation(new OperationAndData<String>(operation, path, null, null));
+ performBackgroundOperation(new OperationAndData<String>(operation, path, null, null, context));
}
@Override
@@ -460,7 +461,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
return compressionProvider;
}
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
boolean isInitialExecution = (event == null);
@@ -475,37 +475,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
if ( RetryLoop.shouldRetry(event.getResultCode()) )
{
- if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
- {
- doQueueOperation = true;
- }
- else
- {
- if ( operationAndData.getErrorCallback() != null )
- {
- operationAndData.getErrorCallback().retriesExhausted(operationAndData);
- }
-
- if ( operationAndData.getCallback() != null )
- {
- sendToBackgroundCallback(operationAndData, event);
- }
-
- KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
- Exception e = null;
- try
- {
- e = (code != null) ? KeeperException.create(code) : null;
- }
- catch ( Throwable ignore )
- {
- }
- if ( e == null )
- {
- e = new Exception("Unknown result code: " + event.getResultCode());
- }
- logError("Background operation retry gave up", e);
- }
+ doQueueOperation = checkBackgroundRetry(operationAndData, event);
break;
}
@@ -586,6 +556,44 @@ public class CuratorFrameworkImpl implements CuratorFramework
return namespaceWatcherMap;
}
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
+ private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
+ {
+ boolean doRetry = false;
+ if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
+ {
+ doRetry = true;
+ }
+ else
+ {
+ if ( operationAndData.getErrorCallback() != null )
+ {
+ operationAndData.getErrorCallback().retriesExhausted(operationAndData);
+ }
+
+ if ( operationAndData.getCallback() != null )
+ {
+ sendToBackgroundCallback(operationAndData, event);
+ }
+
+ KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
+ Exception e = null;
+ try
+ {
+ e = (code != null) ? KeeperException.create(code) : null;
+ }
+ catch ( Throwable ignore )
+ {
+ }
+ if ( e == null )
+ {
+ e = new Exception("Unknown result code: " + event.getResultCode());
+ }
+ logError("Background operation retry gave up", e);
+ }
+ return doRetry;
+ }
+
private <DATA_TYPE> void sendToBackgroundCallback(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
try
@@ -659,13 +667,33 @@ public class CuratorFrameworkImpl implements CuratorFramework
private void performBackgroundOperation(OperationAndData<?> operationAndData)
{
- try
- {
- operationAndData.callPerformBackgroundOperation();
- }
- catch ( Throwable e )
+ boolean isDone = false;
+ while ( !isDone )
{
- handleBackgroundOperationException(operationAndData, e);
+ try
+ {
+ operationAndData.callPerformBackgroundOperation();
+ isDone = true;
+ }
+ catch ( Throwable e )
+ {
+ /**
+ * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
+ * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
+ * and callbacks need to get invoked, etc.
+ */
+ if ( e instanceof CuratorConnectionLossException )
+ {
+ WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
+ CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null);
+ if ( checkBackgroundRetry(operationAndData, event) )
+ {
+ continue;
+ }
+ }
+ handleBackgroundOperationException(operationAndData, e);
+ isDone = true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 86dcc40..198f356 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -170,7 +170,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
}
};
}
- client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback), null);
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index be79d58..a1e2ee5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -141,7 +141,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>
Stat returnStat = null;
if ( backgrounding.inBackground() )
{
- client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
index 19dfa2c..250c2c8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
@@ -119,7 +119,7 @@ class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String>
List<ACL> result = null;
if ( backgrounding.inBackground() )
{
- client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 5ea6190..16f6d4b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -184,7 +184,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
List<String> children = null;
if ( backgrounding.inBackground() )
{
- client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index d9b3907..e994b03 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -272,7 +272,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>
byte[] responseData = null;
if ( backgrounding.inBackground() )
{
- client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 8d39594..38f59a0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
@@ -28,28 +29,35 @@ import java.util.concurrent.atomic.AtomicLong;
class OperationAndData<T> implements Delayed, RetrySleeper
{
- private static final AtomicLong nextOrdinal = new AtomicLong();
-
- private final BackgroundOperation<T> operation;
- private final T data;
- private final BackgroundCallback callback;
- private final long startTimeMs = System.currentTimeMillis();
- private final ErrorCallback<T> errorCallback;
- private final AtomicInteger retryCount = new AtomicInteger(0);
- private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
- private final long ordinal = nextOrdinal.getAndIncrement();
+ private static final AtomicLong nextOrdinal = new AtomicLong();
+
+ private final BackgroundOperation<T> operation;
+ private final T data;
+ private final BackgroundCallback callback;
+ private final long startTimeMs = System.currentTimeMillis();
+ private final ErrorCallback<T> errorCallback;
+ private final AtomicInteger retryCount = new AtomicInteger(0);
+ private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
+ private final long ordinal = nextOrdinal.getAndIncrement();
+ private final Object context;
interface ErrorCallback<T>
{
void retriesExhausted(OperationAndData<T> operationAndData);
}
- OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback)
+ OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
{
this.operation = operation;
this.data = data;
this.callback = callback;
this.errorCallback = errorCallback;
+ this.context = context;
+ }
+
+ Object getContext()
+ {
+ return context;
}
void callPerformBackgroundOperation() throws Exception
@@ -108,7 +116,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper
return 0;
}
- long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
+ long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
if ( diff == 0 )
{
if ( o instanceof OperationAndData )
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
index 20d4b29..7a71d54 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
@@ -112,7 +112,7 @@ class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, Back
Stat resultStat = null;
if ( backgrounding.inBackground() )
{
- client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
index 978d12b..c88ea55 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
@@ -232,7 +232,7 @@ class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndB
Stat resultStat = null;
if ( backgrounding.inBackground() )
{
- client.processBackgroundOperation(new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null), null);
+ client.processBackgroundOperation(new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
index e168d76..2d3e9c0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
@@ -103,7 +103,7 @@ public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String>
@Override
public Void forPath(String path) throws Exception
{
- OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null);
+ OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext());
client.processBackgroundOperation(operationAndData, null);
return null;
}