You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/02/05 19:41:58 UTC

[04/12] git commit: Another edge case found by Evaristo. When the SUSPEND is set, a background sync is executed to detect LOST. If the ZK connection is reset while this command is processing an incorrect LOST might get set. Instead, keep track of ZooKeep

Another edge case found by Evaristo. When the SUSPEND is set, a background sync is executed to detect LOST. If the ZK connection is reset while this command is processing an incorrect LOST might get set. Instead, keep track of ZooKeeper instance reset. If there is a reset ignore the background sync failure and re-submit the sync to test again.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0359bc5a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0359bc5a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0359bc5a

Branch: refs/heads/master
Commit: 0359bc5ab683285f44523d1445ef2eb8116380c4
Parents: 75acb0d
Author: randgalt <ra...@apache.org>
Authored: Sun Jan 12 16:53:03 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jan 12 16:54:17 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |   9 ++
 .../apache/curator/CuratorZookeeperClient.java  |  11 ++
 .../framework/imps/CuratorFrameworkImpl.java    |  17 ++-
 ...estResetConnectionWithBackgroundFailure.java | 104 +++++++++++++++++++
 4 files changed, 139 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index e02ee88..4978c3f 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 class ConnectionState implements Watcher, Closeable
@@ -49,6 +50,7 @@ class ConnectionState implements Watcher, Closeable
     private final AtomicReference<TracerDriver> tracer;
     private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
     private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
+    private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
@@ -131,6 +133,11 @@ class ConnectionState implements Watcher, Closeable
         parentWatchers.remove(watcher);
     }
 
+    long getInstanceIndex()
+    {
+        return instanceIndex.get();
+    }
+
     @Override
     public void process(WatchedEvent event)
     {
@@ -204,6 +211,8 @@ class ConnectionState implements Watcher, Closeable
     {
         log.debug("reset");
 
+        instanceIndex.incrementAndGet();
+
         isConnected.set(false);
         connectionStartMs = System.currentTimeMillis();
         zooKeeper.closeAndReset();

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index f4e56f9..f0a4ab3 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -279,6 +279,17 @@ public class CuratorZookeeperClient implements Closeable
         return connectionTimeoutMs;
     }
 
+    /**
+     * Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
+     * is incremented.
+     *
+     * @return the current instance index
+     */
+    public long getInstanceIndex()
+    {
+        return state.getInstanceIndex();
+    }
+
     void        addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1b0ef3f..f1258ea 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -606,14 +606,27 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         connectionStateManager.setToSuspended();
 
+        final long instanceIndex = client.getInstanceIndex();
+
         // we appear to have disconnected, force a new ZK event and see if we can connect to another server
-        BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
+        final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
         OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
         {
             @Override
             public void retriesExhausted(OperationAndData<String> operationAndData)
             {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
+                // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
+                // so the pending background sync is no longer valid
+                long newInstanceIndex = client.getInstanceIndex();
+                if ( instanceIndex == newInstanceIndex )
+                {
+                    connectionStateManager.addStateChange(ConnectionState.LOST);
+                }
+                else
+                {
+                    log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
+                    performBackgroundOperation(new OperationAndData<String>(operation, "/", null, this, null));
+                }
             }
         };
         performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));

http://git-wip-us.apache.org/repos/asf/curator/blob/0359bc5a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
new file mode 100644
index 0000000..e634a6d
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.client;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnectionStateListener() throws Exception
+    {
+        server.close();
+
+        final StringBuilder listenerSequence = new StringBuilder();
+        LeaderSelector selector = null;
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        try
+        {
+            client.start();
+            timing.sleepABit();
+
+            LeaderSelectorListener listenerLeader = new LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    Thread.currentThread().join();
+                }
+            };
+            selector = new LeaderSelector(client, "/leader", listenerLeader);
+            selector.autoRequeue();
+            selector.start();
+
+            ConnectionStateListener listener1 = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    listenerSequence.append("-").append(newState);
+                }
+            };
+
+            client.getConnectionStateListenable().addListener(listener1);
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+        }
+        finally
+        {
+            Closeables.closeQuietly(selector);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+}
\ No newline at end of file