You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/02/28 22:56:25 UTC

[02/12] curator git commit: Delay reconnect on session expired

Delay reconnect on session expired


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/651ac591
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/651ac591
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/651ac591

Branch: refs/heads/CURATOR-3.0
Commit: 651ac591725567bf85ec830a45b9b062d5dd7474
Parents: 6d8c0be
Author: Zoltan Szekeres <Zo...@morganstanley.com>
Authored: Thu Jan 26 09:03:20 2017 -0500
Committer: Zoltan Szekeres <Zo...@morganstanley.com>
Committed: Thu Jan 26 09:03:20 2017 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     | 49 +++++++++----
 .../framework/imps/TestBlockUntilConnected.java | 75 ++++++++++++++++++++
 2 files changed, 110 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/651ac591/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 7044ddf..bb4f08e 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -149,7 +149,9 @@ class ConnectionState implements Watcher, Closeable
             log.debug("ConnectState watcher: " + event);
         }
 
-        if ( event.getType() == Watcher.Event.EventType.None )
+        final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None;
+
+        if ( eventTypeNone )
         {
             boolean wasConnected = isConnected.get();
             boolean newIsConnected = checkState(event.getState(), wasConnected);
@@ -160,13 +162,33 @@ class ConnectionState implements Watcher, Closeable
             }
         }
 
+        // only wait during tests
+        assert waitOnExpiredEvent(event.getState());
+
         for ( Watcher parentWatcher : parentWatchers )
         {
-
             OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
             parentWatcher.process(event);
             trace.commit();
         }
+
+        if (eventTypeNone) handleState(event.getState());
+    }
+
+    // only for testing
+    private boolean waitOnExpiredEvent(Event.KeeperState currentState)
+    {
+        if (currentState == Event.KeeperState.Expired)
+        {
+            log.debug("Waiting on Expired event for testing");
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch(InterruptedException e) {}
+            log.debug("Continue processing");
+        }
+        return true;
     }
 
     EnsembleProvider getEnsembleProvider()
@@ -240,11 +262,11 @@ class ConnectionState implements Watcher, Closeable
     private boolean checkState(Event.KeeperState state, boolean wasConnected)
     {
         boolean isConnected = wasConnected;
-        boolean checkNewConnectionString = true;
         switch ( state )
         {
         default:
         case Disconnected:
+        case Expired:
         {
             isConnected = false;
             break;
@@ -264,14 +286,6 @@ class ConnectionState implements Watcher, Closeable
             break;
         }
 
-        case Expired:
-        {
-            isConnected = false;
-            checkNewConnectionString = false;
-            handleExpiredSession();
-            break;
-        }
-
         case SaslAuthenticated:
         {
             // NOP
@@ -283,12 +297,19 @@ class ConnectionState implements Watcher, Closeable
             new EventTrace(state.toString(), tracer.get(), getSessionId()).commit();
         }
 
-        if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
+        return isConnected;
+    }
+
+    private void handleState(Event.KeeperState state)
+    {
+        if (state == Event.KeeperState.Expired)
+        {
+            handleExpiredSession();
+        }
+        else if (zooKeeper.hasNewConnectionString())
         {
             handleNewConnectionString();
         }
-
-        return isConnected;
     }
 
     private void handleNewConnectionString()

http://git-wip-us.apache.org/repos/asf/curator/blob/651ac591/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index a6dc7ab..7ea0849 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -21,6 +21,9 @@ package org.apache.curator.framework.imps;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
@@ -28,6 +31,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Timer;
@@ -256,4 +260,75 @@ public class TestBlockUntilConnected extends BaseClassForTests
             }
         }
     }
+
+    /**
+     * Test that we got disconnected before calling blockUntilConnected and we reconnect we receive a session expired event.
+     */
+    @Test
+    public void testBlockUntilConnectedSessionExpired() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+
+        final CountDownLatch lostLatch = new CountDownLatch(1);
+        client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+        {
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            }
+        });
+
+        final CountDownLatch expiredLatch = new CountDownLatch(1);
+        client.getCuratorListenable().addListener(new CuratorListener() {
+            @Override
+            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
+                if (event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired)
+                {
+                    expiredLatch.countDown();
+                }
+            }
+        });
+
+        try
+        {
+            client.start();
+
+            //Block until we're connected
+            Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
+
+            final long sessionTimeoutMs = client.getZookeeperClient().getConnectionTimeoutMs();
+
+            //Kill the server
+            CloseableUtils.closeQuietly(server);
+
+            //Wait until we hit the lost state
+            Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+            Thread.sleep(sessionTimeoutMs);
+
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+            //Wait until we get expired event
+            Assert.assertTrue(timing.awaitLatch(expiredLatch), "Failed to get Expired event");
+
+            Assert.assertTrue(client.blockUntilConnected(50, TimeUnit.SECONDS), "Not connected");
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Unexpected exception " + e);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }