You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/02/28 22:56:25 UTC
[02/12] curator git commit: Delay reconnect on session expired
Delay reconnect on session expired
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/651ac591
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/651ac591
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/651ac591
Branch: refs/heads/CURATOR-3.0
Commit: 651ac591725567bf85ec830a45b9b062d5dd7474
Parents: 6d8c0be
Author: Zoltan Szekeres <Zo...@morganstanley.com>
Authored: Thu Jan 26 09:03:20 2017 -0500
Committer: Zoltan Szekeres <Zo...@morganstanley.com>
Committed: Thu Jan 26 09:03:20 2017 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 49 +++++++++----
.../framework/imps/TestBlockUntilConnected.java | 75 ++++++++++++++++++++
2 files changed, 110 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/651ac591/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 7044ddf..bb4f08e 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -149,7 +149,9 @@ class ConnectionState implements Watcher, Closeable
log.debug("ConnectState watcher: " + event);
}
- if ( event.getType() == Watcher.Event.EventType.None )
+ final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None;
+
+ if ( eventTypeNone )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
@@ -160,13 +162,33 @@ class ConnectionState implements Watcher, Closeable
}
}
+ // only wait during tests
+ assert waitOnExpiredEvent(event.getState());
+
for ( Watcher parentWatcher : parentWatchers )
{
-
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
+
+ if (eventTypeNone) handleState(event.getState());
+ }
+
+ // only for testing
+ private boolean waitOnExpiredEvent(Event.KeeperState currentState)
+ {
+ if (currentState == Event.KeeperState.Expired)
+ {
+ log.debug("Waiting on Expired event for testing");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch(InterruptedException e) {}
+ log.debug("Continue processing");
+ }
+ return true;
}
EnsembleProvider getEnsembleProvider()
@@ -240,11 +262,11 @@ class ConnectionState implements Watcher, Closeable
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
- boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
+ case Expired:
{
isConnected = false;
break;
@@ -264,14 +286,6 @@ class ConnectionState implements Watcher, Closeable
break;
}
- case Expired:
- {
- isConnected = false;
- checkNewConnectionString = false;
- handleExpiredSession();
- break;
- }
-
case SaslAuthenticated:
{
// NOP
@@ -283,12 +297,19 @@ class ConnectionState implements Watcher, Closeable
new EventTrace(state.toString(), tracer.get(), getSessionId()).commit();
}
- if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
+ return isConnected;
+ }
+
+ private void handleState(Event.KeeperState state)
+ {
+ if (state == Event.KeeperState.Expired)
+ {
+ handleExpiredSession();
+ }
+ else if (zooKeeper.hasNewConnectionString())
{
handleNewConnectionString();
}
-
- return isConnected;
}
private void handleNewConnectionString()
http://git-wip-us.apache.org/repos/asf/curator/blob/651ac591/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index a6dc7ab..7ea0849 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -21,6 +21,9 @@ package org.apache.curator.framework.imps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
@@ -28,6 +31,7 @@ import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Timer;
@@ -256,4 +260,75 @@ public class TestBlockUntilConnected extends BaseClassForTests
}
}
}
+
+ /**
+ * Test that we got disconnected before calling blockUntilConnected and we reconnect we receive a session expired event.
+ */
+ @Test
+ public void testBlockUntilConnectedSessionExpired() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ }
+ });
+
+ final CountDownLatch expiredLatch = new CountDownLatch(1);
+ client.getCuratorListenable().addListener(new CuratorListener() {
+ @Override
+ public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
+ if (event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired)
+ {
+ expiredLatch.countDown();
+ }
+ }
+ });
+
+ try
+ {
+ client.start();
+
+ //Block until we're connected
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
+
+ final long sessionTimeoutMs = client.getZookeeperClient().getConnectionTimeoutMs();
+
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ //Wait until we hit the lost state
+ Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+ Thread.sleep(sessionTimeoutMs);
+
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+ //Wait until we get expired event
+ Assert.assertTrue(timing.awaitLatch(expiredLatch), "Failed to get Expired event");
+
+ Assert.assertTrue(client.blockUntilConnected(50, TimeUnit.SECONDS), "Not connected");
+ }
+ catch ( Exception e )
+ {
+ Assert.fail("Unexpected exception " + e);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
}