You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/09/10 22:45:50 UTC

[1/2] git commit: Added test from user Michael Morello that exposes major retry hole in background operations. The getZooKeeper() method is not retried

Updated Branches:
  refs/heads/CURATOR-52 aa174242c -> a561f9a6a


Added test from user Michael Morello that exposes major retry hole in background operations. The getZooKeeper() method is not retried


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/6d7f726b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/6d7f726b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/6d7f726b

Branch: refs/heads/CURATOR-52
Commit: 6d7f726bea4919562cf521d48a105f4ffc23c30b
Parents: aa17424
Author: jordan.zimmerman <jo...@riotgames.com>
Authored: Tue Sep 10 15:11:33 2013 -0500
Committer: jordan.zimmerman <jo...@riotgames.com>
Committed: Tue Sep 10 15:11:33 2013 -0500

----------------------------------------------------------------------
 .../framework/imps/TestFrameworkEdges.java      | 178 +++++++++++--------
 1 file changed, 104 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6d7f726b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 35d67b3..25ddf04 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Queues;
@@ -52,16 +53,45 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class TestFrameworkEdges extends BaseClassForTests
 {
     @Test
-    public void     testReconnectAfterLoss() throws Exception
+    public void connectionLossWithBackgroundTest() throws Exception
     {
-        Timing                          timing = new Timing();
-        CuratorFramework                client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryOneTime(1));
+        try
+        {
+            final CountDownLatch latch = new CountDownLatch(1);
+            client.start();
+            client.getZookeeperClient().blockUntilConnectedOrTimedOut();
+            server.close();
+            client.getChildren().inBackground
+            (
+                new BackgroundCallback()
+                {
+                    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+                    {
+                        latch.countDown();
+                    }
+                }
+            ).forPath("/");
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testReconnectAfterLoss() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
         try
         {
             client.start();
 
-            final CountDownLatch            lostLatch = new CountDownLatch(1);
-            ConnectionStateListener         listener = new ConnectionStateListener()
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
             {
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -100,10 +130,10 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void     testGetAclNoStat() throws Exception
+    public void testGetAclNoStat() throws Exception
     {
 
-        CuratorFramework                client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
@@ -123,17 +153,17 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void     testMissedResponseOnBackgroundESCreate() throws Exception
+    public void testMissedResponseOnBackgroundESCreate() throws Exception
     {
-        CuratorFramework                client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
-            CreateBuilderImpl               createBuilder = (CreateBuilderImpl)client.create();
+            CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
             createBuilder.failNextCreateForTesting = true;
 
-            final BlockingQueue<String>     queue = Queues.newArrayBlockingQueue(1);
-            BackgroundCallback              callback = new BackgroundCallback()
+            final BlockingQueue<String> queue = Queues.newArrayBlockingQueue(1);
+            BackgroundCallback callback = new BackgroundCallback()
             {
                 @Override
                 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -142,7 +172,7 @@ public class TestFrameworkEdges extends BaseClassForTests
                 }
             };
             createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath("/");
-            String          ourPath = queue.poll(10, TimeUnit.SECONDS);
+            String ourPath = queue.poll(10, TimeUnit.SECONDS);
             Assert.assertTrue(ourPath.startsWith(ZKPaths.makePath("/", CreateBuilderImpl.PROTECTED_PREFIX)));
             Assert.assertFalse(createBuilder.failNextCreateForTesting);
         }
@@ -153,15 +183,15 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void     testMissedResponseOnESCreate() throws Exception
+    public void testMissedResponseOnESCreate() throws Exception
     {
-        CuratorFramework                client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
-            CreateBuilderImpl               createBuilder = (CreateBuilderImpl)client.create();
+            CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
             createBuilder.failNextCreateForTesting = true;
-            String                          ourPath = createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/");
+            String ourPath = createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/");
             Assert.assertTrue(ourPath.startsWith(ZKPaths.makePath("/", CreateBuilderImpl.PROTECTED_PREFIX)));
             Assert.assertFalse(createBuilder.failNextCreateForTesting);
         }
@@ -172,7 +202,7 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void     testSessionKilled() throws Exception
+    public void testSessionKilled() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         client.start();
@@ -180,8 +210,8 @@ public class TestFrameworkEdges extends BaseClassForTests
         {
             client.create().forPath("/sessionTest");
 
-            final AtomicBoolean     sessionDied = new AtomicBoolean(false);
-            Watcher         watcher = new Watcher()
+            final AtomicBoolean sessionDied = new AtomicBoolean(false);
+            Watcher watcher = new Watcher()
             {
                 @Override
                 public void process(WatchedEvent event)
@@ -204,35 +234,35 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void         testNestedCalls() throws Exception
+    public void testNestedCalls() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         client.start();
         try
         {
             client.getCuratorListenable().addListener
-            (
-                new CuratorListener()
-                {
-                    @Override
-                    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
+                (
+                    new CuratorListener()
                     {
-                        if ( event.getType() == CuratorEventType.EXISTS )
+                        @Override
+                        public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
                         {
-                            Stat    stat = client.checkExists().forPath("/yo/yo/yo");
-                            Assert.assertNull(stat);
+                            if ( event.getType() == CuratorEventType.EXISTS )
+                            {
+                                Stat stat = client.checkExists().forPath("/yo/yo/yo");
+                                Assert.assertNull(stat);
 
-                            client.create().inBackground(event.getContext()).forPath("/what");
-                        }
-                        else if ( event.getType() == CuratorEventType.CREATE )
-                        {
-                            ((CountDownLatch)event.getContext()).countDown();
+                                client.create().inBackground(event.getContext()).forPath("/what");
+                            }
+                            else if ( event.getType() == CuratorEventType.CREATE )
+                            {
+                                ((CountDownLatch)event.getContext()).countDown();
+                            }
                         }
                     }
-                }
-            );
+                );
 
-            CountDownLatch        latch = new CountDownLatch(1);
+            CountDownLatch latch = new CountDownLatch(1);
             client.checkExists().inBackground(latch).forPath("/hey");
             Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
         }
@@ -243,28 +273,28 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void         testBackgroundFailure() throws Exception
+    public void testBackgroundFailure() throws Exception
     {
-        Timing              timing = new Timing();
-        CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
         client.start();
         try
         {
-            final CountDownLatch        latch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(1);
             client.getConnectionStateListenable().addListener
-            (
-                new ConnectionStateListener()
-                {
-                    @Override
-                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                (
+                    new ConnectionStateListener()
                     {
-                        if ( newState == ConnectionState.LOST )
+                        @Override
+                        public void stateChanged(CuratorFramework client, ConnectionState newState)
                         {
-                            latch.countDown();
+                            if ( newState == ConnectionState.LOST )
+                            {
+                                latch.countDown();
+                            }
                         }
                     }
-                }
-            );
+                );
 
             client.checkExists().forPath("/hey");
             client.checkExists().inBackground().forPath("/hey");
@@ -281,7 +311,7 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void         testFailure() throws Exception
+    public void testFailure() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 100, 100, new RetryOneTime(1));
         client.start();
@@ -306,40 +336,40 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void         testRetry() throws Exception
+    public void testRetry() throws Exception
     {
-        final int       MAX_RETRIES = 3;
-        final int       serverPort = server.getPort();
+        final int MAX_RETRIES = 3;
+        final int serverPort = server.getPort();
 
         final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(10));
         client.start();
         try
         {
-            final AtomicInteger     retries = new AtomicInteger(0);
-            final Semaphore         semaphore = new Semaphore(0);
+            final AtomicInteger retries = new AtomicInteger(0);
+            final Semaphore semaphore = new Semaphore(0);
             client.getZookeeperClient().setRetryPolicy
-            (
-                new RetryPolicy()
-                {
-                    @Override
-                    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
+                (
+                    new RetryPolicy()
                     {
-                        semaphore.release();
-                        if ( retries.incrementAndGet() == MAX_RETRIES )
+                        @Override
+                        public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
                         {
-                            try
+                            semaphore.release();
+                            if ( retries.incrementAndGet() == MAX_RETRIES )
                             {
-                                server = new TestingServer(serverPort);
-                            }
-                            catch ( Exception e )
-                            {
-                                throw new Error(e);
+                                try
+                                {
+                                    server = new TestingServer(serverPort);
+                                }
+                                catch ( Exception e )
+                                {
+                                    throw new Error(e);
+                                }
                             }
+                            return true;
                         }
-                        return true;
                     }
-                }
-            );
+                );
 
             server.stop();
 
@@ -367,7 +397,7 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void         testNotStarted() throws Exception
+    public void testNotStarted() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         try
@@ -386,7 +416,7 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
-    public void         testStopped() throws Exception
+    public void testStopped() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         try


[2/2] git commit: Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry pol

Posted by ra...@apache.org.
Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
and callbacks need to get invoked, etc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/a561f9a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/a561f9a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/a561f9a6

Branch: refs/heads/CURATOR-52
Commit: a561f9a6a17c6fba82551b56ffadcc1b68df46cc
Parents: 6d7f726
Author: jordan.zimmerman <jo...@riotgames.com>
Authored: Tue Sep 10 15:44:20 2013 -0500
Committer: jordan.zimmerman <jo...@riotgames.com>
Committed: Tue Sep 10 15:44:20 2013 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |   2 +-
 .../curator/CuratorConnectionLossException.java |  11 ++
 .../framework/imps/CreateBuilderImpl.java       |   4 +-
 .../framework/imps/CuratorFrameworkImpl.java    | 106 ++++++++++++-------
 .../framework/imps/DeleteBuilderImpl.java       |   2 +-
 .../framework/imps/ExistsBuilderImpl.java       |   2 +-
 .../framework/imps/GetACLBuilderImpl.java       |   2 +-
 .../framework/imps/GetChildrenBuilderImpl.java  |   2 +-
 .../framework/imps/GetDataBuilderImpl.java      |   2 +-
 .../framework/imps/OperationAndData.java        |  32 +++---
 .../framework/imps/SetACLBuilderImpl.java       |   2 +-
 .../framework/imps/SetDataBuilderImpl.java      |   2 +-
 .../curator/framework/imps/SyncBuilderImpl.java |   2 +-
 13 files changed, 109 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index bbb0588..e02ee88 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -188,7 +188,7 @@ class ConnectionState implements Watcher, Closeable
                 }
                 else
                 {
-                    KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException();
+                    KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException();
                     if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                     {
                         log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java b/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java
new file mode 100644
index 0000000..23eeb49
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/CuratorConnectionLossException.java
@@ -0,0 +1,11 @@
+package org.apache.curator;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is needed to differentiate between ConnectionLossException thrown by ZooKeeper
+ * and ConnectionLossException thrown by {@link ConnectionState#checkTimeouts()}
+ */
+public class CuratorConnectionLossException extends KeeperException.ConnectionLossException
+{
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 6d7b261..ee99074 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -486,7 +486,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
                 client.queueOperation(mainOperationAndData);
             }
         };
-        OperationAndData<PathAndBytes>        parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null);
+        OperationAndData<PathAndBytes>        parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
         client.queueOperation(parentOperation);
     }
 
@@ -558,7 +558,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
     private void pathInBackground(final String path, final byte[] data, final String givenPath)
     {
         final AtomicBoolean firstTime = new AtomicBoolean(true);
-        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null)
+        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext())
         {
             @Override
             void callPerformBackgroundOperation() throws Exception

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 146103c..3cccf45 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.imps;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import org.apache.curator.CuratorConnectionLossException;
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
@@ -420,7 +421,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
-        performBackgroundOperation(new OperationAndData<String>(operation, path, null, null));
+        performBackgroundOperation(new OperationAndData<String>(operation, path, null, null, context));
     }
 
     @Override
@@ -460,7 +461,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return compressionProvider;
     }
 
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
     <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
     {
         boolean     isInitialExecution = (event == null);
@@ -475,37 +475,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         {
             if ( RetryLoop.shouldRetry(event.getResultCode()) )
             {
-                if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
-                {
-                    doQueueOperation = true;
-                }
-                else
-                {
-                    if ( operationAndData.getErrorCallback() != null )
-                    {
-                        operationAndData.getErrorCallback().retriesExhausted(operationAndData);
-                    }
-                    
-                    if ( operationAndData.getCallback() != null )
-                    {
-                        sendToBackgroundCallback(operationAndData, event);
-                    }
-
-                    KeeperException.Code    code = KeeperException.Code.get(event.getResultCode());
-                    Exception               e = null;
-                    try
-                    {
-                        e = (code != null) ? KeeperException.create(code) : null;
-                    }
-                    catch ( Throwable ignore )
-                    {
-                    }
-                    if ( e == null )
-                    {
-                        e = new Exception("Unknown result code: " + event.getResultCode());
-                    }
-                    logError("Background operation retry gave up", e);
-                }
+                doQueueOperation = checkBackgroundRetry(operationAndData, event);
                 break;
             }
 
@@ -586,6 +556,44 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return namespaceWatcherMap;
     }
 
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
+    private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
+    {
+        boolean doRetry = false;
+        if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
+        {
+            doRetry = true;
+        }
+        else
+        {
+            if ( operationAndData.getErrorCallback() != null )
+            {
+                operationAndData.getErrorCallback().retriesExhausted(operationAndData);
+            }
+
+            if ( operationAndData.getCallback() != null )
+            {
+                sendToBackgroundCallback(operationAndData, event);
+            }
+
+            KeeperException.Code    code = KeeperException.Code.get(event.getResultCode());
+            Exception               e = null;
+            try
+            {
+                e = (code != null) ? KeeperException.create(code) : null;
+            }
+            catch ( Throwable ignore )
+            {
+            }
+            if ( e == null )
+            {
+                e = new Exception("Unknown result code: " + event.getResultCode());
+            }
+            logError("Background operation retry gave up", e);
+        }
+        return doRetry;
+    }
+
     private <DATA_TYPE> void sendToBackgroundCallback(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
     {
         try
@@ -659,13 +667,33 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     private void performBackgroundOperation(OperationAndData<?> operationAndData)
     {
-        try
-        {
-            operationAndData.callPerformBackgroundOperation();
-        }
-        catch ( Throwable e )
+        boolean isDone = false;
+        while ( !isDone )
         {
-            handleBackgroundOperationException(operationAndData, e);
+            try
+            {
+                operationAndData.callPerformBackgroundOperation();
+                isDone = true;
+            }
+            catch ( Throwable e )
+            {
+                /**
+                 * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
+                 * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
+                 * and callbacks need to get invoked, etc.
+                 */
+                if ( e instanceof CuratorConnectionLossException )
+                {
+                    WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
+                    CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null);
+                    if ( checkBackgroundRetry(operationAndData, event) )
+                    {
+                        continue;
+                    }
+                }
+                handleBackgroundOperationException(operationAndData, e);
+                isDone = true;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 86dcc40..198f356 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -170,7 +170,7 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
                     }
                 };
             }
-            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback), null);
+            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index be79d58..a1e2ee5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -141,7 +141,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>
         Stat        returnStat = null;
         if ( backgrounding.inBackground() )
         {
-            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
index 19dfa2c..250c2c8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
@@ -119,7 +119,7 @@ class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<String>
         List<ACL>       result = null;
         if ( backgrounding.inBackground() )
         {
-            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 5ea6190..16f6d4b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -184,7 +184,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation<
         List<String>        children = null;
         if ( backgrounding.inBackground() )
         {
-            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index d9b3907..e994b03 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -272,7 +272,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>
         byte[]      responseData = null;
         if ( backgrounding.inBackground() )
         {
-            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 8d39594..38f59a0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -28,28 +29,35 @@ import java.util.concurrent.atomic.AtomicLong;
 
 class OperationAndData<T> implements Delayed, RetrySleeper
 {
-    private static final AtomicLong     nextOrdinal = new AtomicLong();
-
-    private final BackgroundOperation<T>    operation;
-    private final T                         data;
-    private final BackgroundCallback        callback;
-    private final long                      startTimeMs = System.currentTimeMillis();
-    private final ErrorCallback<T>          errorCallback;
-    private final AtomicInteger             retryCount = new AtomicInteger(0);
-    private final AtomicLong                sleepUntilTimeMs = new AtomicLong(0);
-    private final long                      ordinal = nextOrdinal.getAndIncrement();
+    private static final AtomicLong nextOrdinal = new AtomicLong();
+
+    private final BackgroundOperation<T> operation;
+    private final T data;
+    private final BackgroundCallback callback;
+    private final long startTimeMs = System.currentTimeMillis();
+    private final ErrorCallback<T> errorCallback;
+    private final AtomicInteger retryCount = new AtomicInteger(0);
+    private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
+    private final long ordinal = nextOrdinal.getAndIncrement();
+    private final Object context;
 
     interface ErrorCallback<T>
     {
         void retriesExhausted(OperationAndData<T> operationAndData);
     }
 
-    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback)
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
     {
         this.operation = operation;
         this.data = data;
         this.callback = callback;
         this.errorCallback = errorCallback;
+        this.context = context;
+    }
+
+    Object getContext()
+    {
+        return context;
     }
 
     void callPerformBackgroundOperation() throws Exception
@@ -108,7 +116,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper
             return 0;
         }
 
-        long        diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
+        long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
         if ( diff == 0 )
         {
             if ( o instanceof OperationAndData )

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
index 20d4b29..7a71d54 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
@@ -112,7 +112,7 @@ class SetACLBuilderImpl implements SetACLBuilder, BackgroundPathable<Stat>, Back
         Stat        resultStat = null;
         if ( backgrounding.inBackground()  )
         {
-            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null), null);
+            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
index 978d12b..c88ea55 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
@@ -232,7 +232,7 @@ class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndB
         Stat        resultStat = null;
         if ( backgrounding.inBackground()  )
         {
-            client.processBackgroundOperation(new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null), null);
+            client.processBackgroundOperation(new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext()), null);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/a561f9a6/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
index e168d76..2d3e9c0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
@@ -103,7 +103,7 @@ public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String>
     @Override
     public Void forPath(String path) throws Exception
     {
-        OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null);
+        OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext());
         client.processBackgroundOperation(operationAndData, null);
         return null;
     }