You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/02/05 19:41:55 UTC

[01/12] git commit: Background connection errors would go straight to LOST which is different than foreground connection errors. Changed so that background connection errors go to SUSPENDED first just like foreground connection errors.

Updated Branches:
  refs/heads/master 09f04e9c9 -> 0a82f4eaa


Background connection errors would go straight to LOST which is different than foreground connection errors. Changed so that background connection errors go to SUSPENDED first just like foreground connection errors.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a937dfac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a937dfac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a937dfac

Branch: refs/heads/master
Commit: a937dfacf68869b1f6a860ffd02993108da99382
Parents: 262d57e
Author: randgalt <ra...@apache.org>
Authored: Sun Nov 17 12:46:55 2013 -0800
Committer: randgalt <ra...@apache.org>
Committed: Sun Nov 17 12:46:55 2013 -0800

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    | 11 ++-
 .../TestPersistentEphemeralNodeListener.java    | 94 ++++++++++++++++++++
 2 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/a937dfac/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index d56c9a4..3aa1097 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -514,7 +514,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
         if ( e instanceof KeeperException.ConnectionLossException )
         {
-            connectionStateManager.addStateChange(ConnectionState.LOST);
+            handleKeeperStateDisconnected();
         }
 
         final String        localReason = reason;
@@ -745,8 +745,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         {
             if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.Disconnected )
             {
-                connectionStateManager.addStateChange(ConnectionState.SUSPENDED);
-                internalSync(this, "/", null);  // we appear to have disconnected, force a new ZK event and see if we can connect to another server
+                handleKeeperStateDisconnected();
             }
             else if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired )
             {
@@ -762,4 +761,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
         }
     }
+
+    private void handleKeeperStateDisconnected()
+    {
+        connectionStateManager.addStateChange(ConnectionState.SUSPENDED);
+        internalSync(this, "/", null);  // we appear to have disconnected, force a new ZK event and see if we can connect to another server
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/a937dfac/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
new file mode 100644
index 0000000..b97ba41
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestPersistentEphemeralNodeListener extends BaseClassForTests
+{
+    @Test
+    public void testListenersReconnectedIsOK() throws Exception
+    {
+        server.close();
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+            node.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            final AtomicReference<ConnectionState> lastState = new AtomicReference<ConnectionState>();
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    lastState.set(newState);
+                    if ( newState == ConnectionState.CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                    if ( newState == ConnectionState.RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                    System.out.println("XXXX " + newState);
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+            timing.sleepABit();
+            server = new TestingServer(server.getPort());
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+            timing.sleepABit();
+            Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+            server.close();
+            timing.sleepABit();
+            server = new TestingServer(server.getPort());
+            timing.sleepABit();
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+            timing.sleepABit();
+            Assert.assertEquals(lastState.get(), ConnectionState.RECONNECTED);
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+}
\ No newline at end of file


[03/12] git commit: Another edge case found Evaristo. When the SUSPEND is set, a background sync is executed to detect LOST. If the ZK connection is reset while this command is processing an incorrect LOST might get set. Instead, keep track of ZooKeeper

Posted by ra...@apache.org.
Another edge case found Evaristo. When the SUSPEND is set, a background sync is executed to detect LOST. If the ZK connection is reset while this command is processing an incorrect LOST might get set. Instead, keep track of ZooKeeper instance reset. If there is a reset ignore the background sync failure and re-submit the sync to test again.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/332c20b3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/332c20b3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/332c20b3

Branch: refs/heads/master
Commit: 332c20b375510b81bbc2a15fcccfacb7d57ec93d
Parents: 75acb0d
Author: randgalt <ra...@apache.org>
Authored: Sun Jan 12 16:53:03 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jan 12 16:53:03 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |   9 ++
 .../apache/curator/CuratorZookeeperClient.java  |  11 ++
 .../framework/imps/CuratorFrameworkImpl.java    |  17 ++-
 ...estResetConnectionWithBackgroundFailure.java | 104 +++++++++++++++++++
 4 files changed, 139 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/332c20b3/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index e02ee88..4978c3f 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 class ConnectionState implements Watcher, Closeable
@@ -49,6 +50,7 @@ class ConnectionState implements Watcher, Closeable
     private final AtomicReference<TracerDriver> tracer;
     private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
     private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
+    private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
@@ -131,6 +133,11 @@ class ConnectionState implements Watcher, Closeable
         parentWatchers.remove(watcher);
     }
 
+    long getInstanceIndex()
+    {
+        return instanceIndex.get();
+    }
+
     @Override
     public void process(WatchedEvent event)
     {
@@ -204,6 +211,8 @@ class ConnectionState implements Watcher, Closeable
     {
         log.debug("reset");
 
+        instanceIndex.incrementAndGet();
+
         isConnected.set(false);
         connectionStartMs = System.currentTimeMillis();
         zooKeeper.closeAndReset();

http://git-wip-us.apache.org/repos/asf/curator/blob/332c20b3/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index f4e56f9..f0a4ab3 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -279,6 +279,17 @@ public class CuratorZookeeperClient implements Closeable
         return connectionTimeoutMs;
     }
 
+    /**
+     * Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
+     * is incremented.
+     *
+     * @return the current instance index
+     */
+    public long getInstanceIndex()
+    {
+        return state.getInstanceIndex();
+    }
+
     void        addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);

http://git-wip-us.apache.org/repos/asf/curator/blob/332c20b3/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1b0ef3f..f1258ea 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -606,14 +606,27 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         connectionStateManager.setToSuspended();
 
+        final long instanceIndex = client.getInstanceIndex();
+
         // we appear to have disconnected, force a new ZK event and see if we can connect to another server
-        BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
+        final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
         OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
         {
             @Override
             public void retriesExhausted(OperationAndData<String> operationAndData)
             {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
+                // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
+                // so the pending background sync is no longer valid
+                long newInstanceIndex = client.getInstanceIndex();
+                if ( instanceIndex == newInstanceIndex )
+                {
+                    connectionStateManager.addStateChange(ConnectionState.LOST);
+                }
+                else
+                {
+                    log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
+                    performBackgroundOperation(new OperationAndData<String>(operation, "/", null, this, null));
+                }
             }
         };
         performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));

http://git-wip-us.apache.org/repos/asf/curator/blob/332c20b3/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
new file mode 100644
index 0000000..e634a6d
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.client;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnectionStateListener() throws Exception
+    {
+        server.close();
+
+        final StringBuilder listenerSequence = new StringBuilder();
+        LeaderSelector selector = null;
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        try
+        {
+            client.start();
+            timing.sleepABit();
+
+            LeaderSelectorListener listenerLeader = new LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    Thread.currentThread().join();
+                }
+            };
+            selector = new LeaderSelector(client, "/leader", listenerLeader);
+            selector.autoRequeue();
+            selector.start();
+
+            ConnectionStateListener listener1 = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    listenerSequence.append("-").append(newState);
+                }
+            };
+
+            client.getConnectionStateListenable().addListener(listener1);
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+        }
+        finally
+        {
+            Closeables.closeQuietly(selector);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+}
\ No newline at end of file


[09/12] git commit: don't need to hoist value anymore

Posted by ra...@apache.org.
don't need to hoist value anymore


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/22659054
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/22659054
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/22659054

Branch: refs/heads/master
Commit: 226590541f07ce4a611f71d389a43762d118397d
Parents: c40afd9
Author: randgalt <ra...@apache.org>
Authored: Mon Feb 3 13:49:45 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Feb 3 13:49:45 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/CuratorFrameworkImpl.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/22659054/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index b480d9b..0f98495 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -623,8 +623,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
             {
                 // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
                 // so the pending background sync is no longer valid
-                long newInstanceIndex = client.getInstanceIndex();
-                if ( (instanceIndex < 0) || (instanceIndex == newInstanceIndex) )
+                if ( (instanceIndex < 0) || (instanceIndex == client.getInstanceIndex()) )
                 {
                     connectionStateManager.addStateChange(ConnectionState.LOST);
                 }


[11/12] git commit: Merge branch 'master' into CURATOR-72

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-72


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/777a5f13
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/777a5f13
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/777a5f13

Branch: refs/heads/master
Commit: 777a5f13ca1055849d4ffa9c3811c0e3283b2168
Parents: 5238d5f 863eaee
Author: randgalt <ra...@apache.org>
Authored: Mon Feb 3 13:57:36 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Feb 3 13:57:36 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |  4 +-
 .../exhibitor/DefaultExhibitorRestClient.java   |  4 +-
 .../apache/curator/utils/CloseableUtils.java    | 70 ++++++++++++++++++++
 .../curator/TestSessionFailRetryLoop.java       | 10 +--
 .../TestExhibitorEnsembleProvider.java          |  6 +-
 .../src/main/java/cache/PathCacheExample.java   | 10 +--
 .../main/java/discovery/DiscoveryExample.java   | 16 ++---
 .../src/main/java/discovery/ExampleServer.java  |  4 +-
 .../main/java/leader/LeaderSelectorExample.java |  8 +--
 .../src/main/java/locking/LockingExample.java   |  6 +-
 .../imps/CuratorTempFrameworkImpl.java          |  4 +-
 .../curator/framework/imps/TestCompression.java |  8 +--
 .../framework/imps/TestFailedDeleteManager.java | 10 +--
 .../curator/framework/imps/TestFramework.java   |  6 +-
 .../framework/imps/TestFrameworkBackground.java |  8 +--
 .../framework/imps/TestFrameworkEdges.java      | 22 +++---
 .../curator/framework/imps/TestMultiClient.java |  6 +-
 .../framework/imps/TestNamespaceFacade.java     | 12 ++--
 .../framework/imps/TestNeverConnected.java      |  4 +-
 .../curator/framework/imps/TestReadOnly.java    |  6 +-
 .../framework/imps/TestTempFramework.java       |  6 +-
 .../framework/imps/TestWatcherIdentity.java     | 10 +--
 .../curator/framework/imps/TestWithCluster.java | 10 +--
 .../framework/recipes/leader/LeaderLatch.java   |  2 +-
 .../framework/recipes/locks/ChildReaper.java    |  4 +-
 .../recipes/locks/InterProcessSemaphore.java    |  6 +-
 .../recipes/locks/InterProcessSemaphoreV2.java  |  6 +-
 .../recipes/queue/DistributedQueue.java         |  4 +-
 .../framework/recipes/queue/QueueSharder.java   |  4 +-
 .../barriers/TestDistributedBarrier.java        | 10 +--
 .../barriers/TestDistributedDoubleBarrier.java  |  8 +--
 .../framework/recipes/cache/TestNodeCache.java  | 18 ++---
 .../recipes/cache/TestPathChildrenCache.java    | 24 +++----
 .../cache/TestPathChildrenCacheInCluster.java   |  8 +--
 .../recipes/leader/TestLeaderLatch.java         | 28 ++++----
 .../recipes/leader/TestLeaderLatchCluster.java  |  8 +--
 .../recipes/leader/TestLeaderSelector.java      | 22 +++---
 .../leader/TestLeaderSelectorCluster.java       | 10 +--
 .../leader/TestLeaderSelectorParticipants.java  | 10 +--
 .../leader/TestLeaderSelectorWithExecutor.java  |  6 +-
 .../recipes/locks/SemaphoreClient.java          |  4 +-
 .../recipes/locks/TestChildReaper.java          | 14 ++--
 .../locks/TestInterProcessMutexBase.java        |  8 +--
 .../locks/TestInterProcessReadWriteLock.java    | 12 ++--
 .../locks/TestInterProcessSemaphore.java        | 10 +--
 .../locks/TestInterProcessSemaphoreCluster.java | 14 ++--
 .../framework/recipes/locks/TestLockACLs.java   |  6 +-
 .../locks/TestLockCleanlinessWithFaults.java    |  4 +-
 .../framework/recipes/locks/TestReaper.java     | 38 +++++------
 .../nodes/TestPersistentEphemeralNode.java      |  8 +--
 .../queue/TestBoundedDistributedQueue.java      | 12 ++--
 .../queue/TestDistributedDelayQueue.java        | 14 ++--
 .../recipes/queue/TestDistributedIdQueue.java   | 10 +--
 .../queue/TestDistributedPriorityQueue.java     | 18 ++---
 .../recipes/queue/TestDistributedQueue.java     | 50 +++++++-------
 .../recipes/queue/TestQueueSharder.java         | 16 ++---
 .../queue/TestSimpleDistributedQueue.java       |  4 +-
 .../recipes/shared/TestSharedCount.java         |  4 +-
 .../discovery/server/rest/InstanceCleanup.java  |  2 +-
 .../src/site/confluence/index.confluence        |  8 +--
 .../x/discovery/details/ServiceCacheImpl.java   |  4 +-
 .../discovery/details/ServiceDiscoveryImpl.java |  6 +-
 .../curator/x/discovery/TestServiceCache.java   | 10 +--
 .../x/discovery/TestServiceDiscovery.java       | 12 ++--
 64 files changed, 393 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/777a5f13/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------


[10/12] git commit: added a comment

Posted by ra...@apache.org.
added a comment


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5238d5f0
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5238d5f0
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5238d5f0

Branch: refs/heads/master
Commit: 5238d5f083f0c5b9c28e4c73d834873737dab936
Parents: 2265905
Author: randgalt <ra...@apache.org>
Authored: Mon Feb 3 13:50:25 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Feb 3 13:50:25 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/CuratorFrameworkImpl.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5238d5f0/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 0f98495..7854308 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -622,7 +622,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
             public void retriesExhausted(OperationAndData<String> operationAndData)
             {
                 // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
-                // so the pending background sync is no longer valid
+                // so the pending background sync is no longer valid.
+                // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost
                 if ( (instanceIndex < 0) || (instanceIndex == client.getInstanceIndex()) )
                 {
                     connectionStateManager.addStateChange(ConnectionState.LOST);


[06/12] git commit: Previous version was incorrectly re-using the instanceIndex. Need to use latest value

Posted by ra...@apache.org.
Previous version was incorrectly re-using the instanceIndex. Need to use latest value


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8fbe4b7d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8fbe4b7d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8fbe4b7d

Branch: refs/heads/master
Commit: 8fbe4b7db66d157c951268f1ce07e08cf176bd77
Parents: 4f47e7b
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 13 17:59:37 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 13 17:59:37 2014 -0500

----------------------------------------------------------------------
 .../apache/curator/framework/imps/CuratorFrameworkImpl.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/8fbe4b7d/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index f1258ea..d516f48 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -606,8 +606,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         connectionStateManager.setToSuspended();
 
-        final long instanceIndex = client.getInstanceIndex();
+        doSyncForSuspendedConnection(client.getInstanceIndex());
+    }
 
+    private void doSyncForSuspendedConnection(final long instanceIndex)
+    {
         // we appear to have disconnected, force a new ZK event and see if we can connect to another server
         final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
         OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
@@ -625,7 +628,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 else
                 {
                     log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
-                    performBackgroundOperation(new OperationAndData<String>(operation, "/", null, this, null));
+                    doSyncForSuspendedConnection(newInstanceIndex);
                 }
             }
         };


[04/12] git commit: Another edge case found by Evaristo. When the SUSPEND is set, a background sync is executed to detect LOST. If the ZK connection is reset while this command is processing an incorrect LOST might get set. Instead, keep track of ZooKeep

Posted by ra...@apache.org.
Another edge case found by Evaristo. When the SUSPEND is set, a background sync is executed to detect LOST. If the ZK connection is reset while this command is processing an incorrect LOST might get set. Instead, keep track of ZooKeeper instance reset. If there is a reset ignore the background sync failure and re-submit the sync to test again.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0359bc5a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0359bc5a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0359bc5a

Branch: refs/heads/master
Commit: 0359bc5ab683285f44523d1445ef2eb8116380c4
Parents: 75acb0d
Author: randgalt <ra...@apache.org>
Authored: Sun Jan 12 16:53:03 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jan 12 16:54:17 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |   9 ++
 .../apache/curator/CuratorZookeeperClient.java  |  11 ++
 .../framework/imps/CuratorFrameworkImpl.java    |  17 ++-
 ...estResetConnectionWithBackgroundFailure.java | 104 +++++++++++++++++++
 4 files changed, 139 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index e02ee88..4978c3f 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 class ConnectionState implements Watcher, Closeable
@@ -49,6 +50,7 @@ class ConnectionState implements Watcher, Closeable
     private final AtomicReference<TracerDriver> tracer;
     private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
     private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
+    private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
@@ -131,6 +133,11 @@ class ConnectionState implements Watcher, Closeable
         parentWatchers.remove(watcher);
     }
 
+    long getInstanceIndex()
+    {
+        return instanceIndex.get();
+    }
+
     @Override
     public void process(WatchedEvent event)
     {
@@ -204,6 +211,8 @@ class ConnectionState implements Watcher, Closeable
     {
         log.debug("reset");
 
+        instanceIndex.incrementAndGet();
+
         isConnected.set(false);
         connectionStartMs = System.currentTimeMillis();
         zooKeeper.closeAndReset();

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index f4e56f9..f0a4ab3 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -279,6 +279,17 @@ public class CuratorZookeeperClient implements Closeable
         return connectionTimeoutMs;
     }
 
+    /**
+     * Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
+     * is incremented.
+     *
+     * @return the current instance index
+     */
+    public long getInstanceIndex()
+    {
+        return state.getInstanceIndex();
+    }
+
     void        addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1b0ef3f..f1258ea 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -606,14 +606,27 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         connectionStateManager.setToSuspended();
 
+        final long instanceIndex = client.getInstanceIndex();
+
         // we appear to have disconnected, force a new ZK event and see if we can connect to another server
-        BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
+        final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
         OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
         {
             @Override
             public void retriesExhausted(OperationAndData<String> operationAndData)
             {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
+                // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
+                // so the pending background sync is no longer valid
+                long newInstanceIndex = client.getInstanceIndex();
+                if ( instanceIndex == newInstanceIndex )
+                {
+                    connectionStateManager.addStateChange(ConnectionState.LOST);
+                }
+                else
+                {
+                    log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
+                    performBackgroundOperation(new OperationAndData<String>(operation, "/", null, this, null));
+                }
             }
         };
         performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
new file mode 100644
index 0000000..e634a6d
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.client;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnectionStateListener() throws Exception
+    {
+        server.close();
+
+        final StringBuilder listenerSequence = new StringBuilder();
+        LeaderSelector selector = null;
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        try
+        {
+            client.start();
+            timing.sleepABit();
+
+            LeaderSelectorListener listenerLeader = new LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    Thread.currentThread().join();
+                }
+            };
+            selector = new LeaderSelector(client, "/leader", listenerLeader);
+            selector.autoRequeue();
+            selector.start();
+
+            ConnectionStateListener listener1 = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    listenerSequence.append("-").append(newState);
+                }
+            };
+
+            client.getConnectionStateListenable().addListener(listener1);
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+        }
+        finally
+        {
+            Closeables.closeQuietly(selector);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+}
\ No newline at end of file


[12/12] git commit: Merge branch 'CURATOR-72'

Posted by ra...@apache.org.
Merge branch 'CURATOR-72'


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0a82f4ea
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0a82f4ea
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0a82f4ea

Branch: refs/heads/master
Commit: 0a82f4eaa518ae41097fefdce332099dd2983286
Parents: 09f04e9 777a5f1
Author: randgalt <ra...@apache.org>
Authored: Wed Feb 5 13:41:33 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Feb 5 13:41:33 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |   9 ++
 .../apache/curator/CuratorZookeeperClient.java  |  11 ++
 .../java/org/apache/curator/HandleHolder.java   |   2 +-
 .../curator/framework/imps/Backgrounding.java   |   5 +
 .../framework/imps/CuratorFrameworkImpl.java    | 145 +++++++++++++------
 .../framework/state/ConnectionStateManager.java |  47 +++++-
 .../framework/client/TestBackgroundStates.java  | 142 ++++++++++++++++++
 ...estResetConnectionWithBackgroundFailure.java | 104 +++++++++++++
 .../TestPersistentEphemeralNodeListener.java    |  94 ++++++++++++
 9 files changed, 510 insertions(+), 49 deletions(-)
----------------------------------------------------------------------



[02/12] git commit: Continued re-work on the connection state transitions involving background/async APIs.

Posted by ra...@apache.org.
Continued re-work on the connection state transitions involving background/async APIs.

ConnectionStateManager now uses synchronization. This shouldn't hurt performance but rationalizes state changes from foreground/background ops.

Background errors now go through same code as foreground errors. Transition to LOST is handled specifically instead of generally in logError().


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/75acb0d9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/75acb0d9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/75acb0d9

Branch: refs/heads/master
Commit: 75acb0d9222c3a54c9e15edff319acedcb26bbf5
Parents: a937dfa
Author: randgalt <ra...@apache.org>
Authored: Tue Dec 24 17:54:55 2013 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Dec 24 17:54:55 2013 -0500

----------------------------------------------------------------------
 .../java/org/apache/curator/HandleHolder.java   |   2 +-
 .../curator/framework/imps/Backgrounding.java   |   5 +
 .../framework/imps/CuratorFrameworkImpl.java    | 128 +++++++++++------
 .../framework/state/ConnectionStateManager.java |  44 +++++-
 .../framework/client/TestBackgroundStates.java  | 142 +++++++++++++++++++
 5 files changed, 268 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-client/src/main/java/org/apache/curator/HandleHolder.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/HandleHolder.java b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
index 4922688..1f7cd91 100644
--- a/curator-client/src/main/java/org/apache/curator/HandleHolder.java
+++ b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
@@ -52,7 +52,7 @@ class HandleHolder
 
     ZooKeeper getZooKeeper() throws Exception
     {
-        return helper.getZooKeeper();
+        return (helper != null) ? helper.getZooKeeper() : null;
     }
 
     String  getConnectionString()

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
index 6ae9151..262b2a8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.imps;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.KeeperException;
 import java.util.concurrent.Executor;
 
 class Backgrounding
@@ -109,6 +110,10 @@ class Backgrounding
                             }
                             catch ( Exception e )
                             {
+                                if ( e instanceof KeeperException )
+                                {
+                                    client.validateConnection(client.codeToState(((KeeperException)e).code()));
+                                }
                                 client.logError("Background operation result handling threw exception", e);
                             }
                         }

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 3aa1097..1b0ef3f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -512,24 +512,19 @@ public class CuratorFrameworkImpl implements CuratorFramework
             log.error(reason, e);
         }
 
-        if ( e instanceof KeeperException.ConnectionLossException )
-        {
-            handleKeeperStateDisconnected();
-        }
-
         final String        localReason = reason;
         unhandledErrorListeners.forEach
-        (
-            new Function<UnhandledErrorListener, Void>()
-            {
-                @Override
-                public Void apply(UnhandledErrorListener listener)
+            (
+                new Function<UnhandledErrorListener, Void>()
                 {
-                    listener.unhandledError(localReason, e);
-                    return null;
+                    @Override
+                    public Void apply(UnhandledErrorListener listener)
+                    {
+                        listener.unhandledError(localReason, e);
+                        return null;
+                    }
                 }
-            }
-        );
+            );
     }
 
     String    unfixForNamespace(String path)
@@ -557,6 +552,73 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return namespaceWatcherMap;
     }
 
+    void validateConnection(Watcher.Event.KeeperState state)
+    {
+        if ( state == Watcher.Event.KeeperState.Disconnected )
+        {
+            suspendConnection();
+        }
+        else if ( state == Watcher.Event.KeeperState.Expired )
+        {
+            connectionStateManager.addStateChange(ConnectionState.LOST);
+        }
+        else if ( state == Watcher.Event.KeeperState.SyncConnected )
+        {
+            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
+        }
+        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
+        {
+            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
+        }
+    }
+
+    Watcher.Event.KeeperState codeToState(KeeperException.Code code)
+    {
+        switch ( code )
+        {
+        case AUTHFAILED:
+        case NOAUTH:
+        {
+            return Watcher.Event.KeeperState.AuthFailed;
+        }
+
+        case CONNECTIONLOSS:
+        case OPERATIONTIMEOUT:
+        {
+            return Watcher.Event.KeeperState.Disconnected;
+        }
+
+        case SESSIONEXPIRED:
+        {
+            return Watcher.Event.KeeperState.Expired;
+        }
+
+        case OK:
+        case SESSIONMOVED:
+        {
+            return Watcher.Event.KeeperState.SyncConnected;
+        }
+        }
+        return Watcher.Event.KeeperState.fromInt(-1);
+    }
+
+    private void suspendConnection()
+    {
+        connectionStateManager.setToSuspended();
+
+        // we appear to have disconnected, force a new ZK event and see if we can connect to another server
+        BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
+        OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
+        {
+            @Override
+            public void retriesExhausted(OperationAndData<String> operationAndData)
+            {
+                connectionStateManager.addStateChange(ConnectionState.LOST);
+            }
+        };
+        performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));
+    }
+
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
     private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
     {
@@ -588,8 +650,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
             if ( e == null )
             {
-                e = new Exception("Unknown result code: " + event.getResultCode());
+                e = new Exception("Unknown result codegetResultCode()");
             }
+
+            validateConnection(codeToState(code));
             logError("Background operation retry gave up", e);
         }
         return doRetry;
@@ -714,7 +778,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     private void processEvent(final CuratorEvent curatorEvent)
     {
-        validateConnection(curatorEvent);
+        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
+        {
+            validateConnection(curatorEvent.getWatchedEvent().getState());
+        }
 
         listeners.forEach
         (
@@ -738,33 +805,4 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
         );
     }
-
-    private void validateConnection(CuratorEvent curatorEvent)
-    {
-        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
-        {
-            if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.Disconnected )
-            {
-                handleKeeperStateDisconnected();
-            }
-            else if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired )
-            {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
-            }
-            else if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.SyncConnected )
-            {
-                connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
-            }
-            else if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.ConnectedReadOnly )
-            {
-                connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
-            }
-        }
-    }
-
-    private void handleKeeperStateDisconnected()
-    {
-        connectionStateManager.addStateChange(ConnectionState.SUSPENDED);
-        internalSync(this, "/", null);  // we appear to have disconnected, force a new ZK event and see if we can connect to another server
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index fe5f18a..a2cfa60 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -65,11 +65,13 @@ public class ConnectionStateManager implements Closeable
     private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
     private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
-    private final AtomicReference<ConnectionState> currentState = new AtomicReference<ConnectionState>();
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
     private final ExecutorService service;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
 
+    // guarded by sync
+    private ConnectionState currentConnectionState;
+
     private enum State
     {
         LATENT,
@@ -133,23 +135,44 @@ public class ConnectionStateManager implements Closeable
     }
 
     /**
+     * Change to {@link ConnectionState#SUSPENDED} only if not already suspended and not lost
+     */
+    public synchronized void setToSuspended()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        if ( (currentConnectionState == ConnectionState.LOST) || (currentConnectionState == ConnectionState.SUSPENDED) )
+        {
+            return;
+        }
+
+        currentConnectionState = ConnectionState.SUSPENDED;
+        postState(ConnectionState.SUSPENDED);
+    }
+
+    /**
      * Post a state change. If the manager is already in that state the change
      * is ignored. Otherwise the change is queued for listeners.
      *
      * @param newConnectionState new state
+     * @return true if the state actually changed, false if it was already at that state
      */
-    public void addStateChange(ConnectionState newConnectionState)
+    public synchronized boolean addStateChange(ConnectionState newConnectionState)
     {
         if ( state.get() != State.STARTED )
         {
-            return;
+            return false;
         }
 
-        ConnectionState previousState = currentState.getAndSet(newConnectionState);
+        ConnectionState previousState = currentConnectionState;
         if ( previousState == newConnectionState )
         {
-            return;
+            return false;
         }
+        currentConnectionState = newConnectionState;
 
         ConnectionState localState = newConnectionState;
         boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED));
@@ -158,8 +181,15 @@ public class ConnectionStateManager implements Closeable
             localState = ConnectionState.CONNECTED;
         }
 
-        log.info("State change: " + localState);
-        while ( !eventQueue.offer(localState) )
+        postState(localState);
+
+        return true;
+    }
+
+    private void postState(ConnectionState state)
+    {
+        log.info("State change: " + state);
+        while ( !eventQueue.offer(state) )
         {
             eventQueue.poll();
             log.warn("ConnectionStateManager queue full - dropping events to make room");

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
new file mode 100644
index 0000000..b1c382f
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.client;
+
+import com.google.common.collect.Queues;
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.DebugUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+// NOTE: these tests are in Framework as they use the PersistentEphemeralNode recipe
+
+public class TestBackgroundStates extends BaseClassForTests
+{
+    @Test
+    public void testListenersReconnectedIsOK() throws Exception
+    {
+        server.close();
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+            node.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            final AtomicReference<ConnectionState> lastState = new AtomicReference<ConnectionState>();
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    lastState.set(newState);
+                    if ( newState == ConnectionState.CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                    if ( newState == ConnectionState.RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+            timing.sleepABit();
+            server = new TestingServer(server.getPort());
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+            timing.sleepABit();
+            Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+            server.close();
+            timing.sleepABit();
+            server = new TestingServer(server.getPort());
+            timing.sleepABit();
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+            timing.sleepABit();
+            Assert.assertEquals(lastState.get(), ConnectionState.RECONNECTED);
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testConnectionStateListener() throws Exception
+    {
+        System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
+
+        server.close();
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(timing.milliseconds()));
+        try
+        {
+            client.start();
+            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+            node.start();
+
+            final BlockingQueue<ConnectionState> stateVector = Queues.newLinkedBlockingQueue(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    stateVector.offer(newState);
+                }
+            };
+
+            Timing waitingTiming = timing.forWaiting();
+
+            client.getConnectionStateListenable().addListener(listener);
+            server = new TestingServer(server.getPort());
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+            server.close();
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+            server = new TestingServer(server.getPort());
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+            server.close();
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+
+}
\ No newline at end of file


[05/12] git commit: Merge branch 'CURATOR-72' of https://git-wip-us.apache.org/repos/asf/curator into CURATOR-72

Posted by ra...@apache.org.
Merge branch 'CURATOR-72' of https://git-wip-us.apache.org/repos/asf/curator into CURATOR-72


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4f47e7b1
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4f47e7b1
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4f47e7b1

Branch: refs/heads/master
Commit: 4f47e7b11cfcbd6b2ae80d51c1130e1752b40318
Parents: 0359bc5 332c20b
Author: randgalt <ra...@apache.org>
Authored: Sun Jan 12 16:54:39 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jan 12 16:54:39 2014 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[07/12] git commit: patch from Evaristo to possibly finalize this change

Posted by ra...@apache.org.
patch from Evaristo to possibly finalize this change


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f0fc51f7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f0fc51f7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f0fc51f7

Branch: refs/heads/master
Commit: f0fc51f766f608bbb0df7ac506f592407eb9be83
Parents: 8fbe4b7
Author: randgalt <ra...@apache.org>
Authored: Mon Feb 3 09:23:03 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Feb 3 09:23:03 2014 -0500

----------------------------------------------------------------------
 .../apache/curator/framework/imps/CuratorFrameworkImpl.java | 7 +++++--
 .../curator/framework/state/ConnectionStateManager.java     | 9 ++++++---
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f0fc51f7/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index d516f48..3a2a48f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -604,7 +604,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     private void suspendConnection()
     {
-        connectionStateManager.setToSuspended();
+        if ( !connectionStateManager.setToSuspended() )
+        {
+            return;
+        }
 
         doSyncForSuspendedConnection(client.getInstanceIndex());
     }
@@ -782,7 +785,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 }
                 else
                 {
-                    handleBackgroundOperationException(operationAndData, e);
+                	logError("Background retry gave up", e);
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/curator/blob/f0fc51f7/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index a2cfa60..42804b8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -136,21 +136,24 @@ public class ConnectionStateManager implements Closeable
 
     /**
      * Change to {@link ConnectionState#SUSPENDED} only if not already suspended and not lost
+     * 
+     * @return true if connection is set to SUSPENDED
      */
-    public synchronized void setToSuspended()
+    public synchronized boolean setToSuspended()
     {
         if ( state.get() != State.STARTED )
         {
-            return;
+            return false;
         }
 
         if ( (currentConnectionState == ConnectionState.LOST) || (currentConnectionState == ConnectionState.SUSPENDED) )
         {
-            return;
+            return false;
         }
 
         currentConnectionState = ConnectionState.SUSPENDED;
         postState(ConnectionState.SUSPENDED);
+        return true;
     }
 
     /**


[08/12] git commit: if doSyncForSuspendedConnection repeatedly fails there may be an infinite loop of connection resets. So, after the first reset (signaled by getInstanceIndex value changing) give up and mark the connection LOST

Posted by ra...@apache.org.
if doSyncForSuspendedConnection repeatedly fails there may be an infinite loop of connection resets. So, after the first reset (signaled by getInstanceIndex value changing) give up and mark the connection LOST


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c40afd96
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c40afd96
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c40afd96

Branch: refs/heads/master
Commit: c40afd96a0121ac45b340679174320f7b4f02d26
Parents: f0fc51f
Author: randgalt <ra...@apache.org>
Authored: Mon Feb 3 13:46:28 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Feb 3 13:46:28 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/framework/imps/CuratorFrameworkImpl.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c40afd96/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 3a2a48f..b480d9b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -624,14 +624,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
                 // so the pending background sync is no longer valid
                 long newInstanceIndex = client.getInstanceIndex();
-                if ( instanceIndex == newInstanceIndex )
+                if ( (instanceIndex < 0) || (instanceIndex == newInstanceIndex) )
                 {
                     connectionStateManager.addStateChange(ConnectionState.LOST);
                 }
                 else
                 {
                     log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
-                    doSyncForSuspendedConnection(newInstanceIndex);
+                    // send -1 to signal that if it happens again, punt and mark the connection lost
+                    doSyncForSuspendedConnection(-1);
                 }
             }
         };