You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/22 17:47:11 UTC
[1/2] curator git commit: minor reformat
Repository: curator
Updated Branches:
refs/heads/CURATOR-247 d31700997 -> 847cc0d24
minor reformat
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b8d4c3d7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b8d4c3d7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b8d4c3d7
Branch: refs/heads/CURATOR-247
Commit: b8d4c3d77de029917820634fa4ed21be19bbcf2c
Parents: d317009
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 17:59:07 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 17:59:07 2015 -0500
----------------------------------------------------------------------
.../src/test/java/org/apache/curator/TestEnsurePath.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/b8d4c3d7/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
index 4fe9561..871e4af 100644
--- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
+++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
@@ -51,6 +51,7 @@ public class TestEnsurePath
CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
RetryPolicy retryPolicy = new RetryOneTime(1);
RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
+ when(curator.retryConnectionTimeouts()).thenReturn(true);
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
when(curator.newRetryLoop()).thenReturn(retryLoop);
@@ -76,6 +77,7 @@ public class TestEnsurePath
RetryPolicy retryPolicy = new RetryOneTime(1);
RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
final CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
+ when(curator.retryConnectionTimeouts()).thenReturn(true);
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
when(curator.newRetryLoop()).thenReturn(retryLoop);
[2/2] curator git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/847cc0d2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/847cc0d2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/847cc0d2
Branch: refs/heads/CURATOR-247
Commit: 847cc0d2415f59c2943d4a2734564119ffb38bb1
Parents: b8d4c3d
Author: randgalt <ra...@apache.org>
Authored: Sat Aug 22 10:47:01 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Aug 22 10:47:01 2015 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 15 ++++++--
.../apache/curator/CuratorZookeeperClient.java | 36 ++++++++++++++++++--
.../framework/imps/CuratorFrameworkImpl.java | 12 ++-----
.../framework/state/ConnectionStateManager.java | 2 +-
.../imps/TestEnabledSessionExpiredState.java | 2 +-
...estResetConnectionWithBackgroundFailure.java | 19 +++++++----
.../java/org/apache/curator/test/Timing.java | 21 ++++++++++++
7 files changed, 84 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 1dfdbef..c3d6921 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -52,6 +52,7 @@ class ConnectionState implements Watcher, Closeable
private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
private final AtomicLong instanceIndex = new AtomicLong();
private volatile long connectionStartMs = 0;
+ private final AtomicBoolean enableTimeoutChecks = new AtomicBoolean(true);
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
@@ -67,6 +68,11 @@ class ConnectionState implements Watcher, Closeable
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
+ void disableTimeoutChecks()
+ {
+ enableTimeoutChecks.set(false);
+ }
+
ZooKeeper getZooKeeper() throws Exception
{
if ( SessionFailRetryLoop.sessionForThreadHasFailed() )
@@ -81,10 +87,13 @@ class ConnectionState implements Watcher, Closeable
throw exception;
}
- boolean localIsConnected = isConnected.get();
- if ( !localIsConnected )
+ if ( enableTimeoutChecks.get() )
{
- checkTimeouts();
+ boolean localIsConnected = isConnected.get();
+ if ( !localIsConnected )
+ {
+ checkTimeouts();
+ }
}
return zooKeeper.getZooKeeper();
http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index fbb2f4c..ce6e9d3 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -50,6 +50,7 @@ public class CuratorZookeeperClient implements Closeable
private final int connectionTimeoutMs;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
+ private final boolean manageTimeouts;
/**
*
@@ -61,7 +62,7 @@ public class CuratorZookeeperClient implements Closeable
*/
public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
- this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
+ this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true);
}
/**
@@ -73,7 +74,7 @@ public class CuratorZookeeperClient implements Closeable
*/
public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
- this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
+ this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true);
}
/**
@@ -90,6 +91,25 @@ public class CuratorZookeeperClient implements Closeable
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
{
+ this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, true);
+ }
+
+ /**
+ * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
+ * @param ensembleProvider the ensemble provider
+ * @param sessionTimeoutMs session timeout
+ * @param connectionTimeoutMs connection timeout
+ * @param watcher default watcher or null
+ * @param retryPolicy the retry policy to use
+ * @param canBeReadOnly if true, allow ZooKeeper client to enter
+ * read only mode in case of a network partition. See
+ * {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
+ * for details
+ * @param manageTimeouts in general, Curator clients try to manage session/connection timeouts. If this is false, that management is turned off
+ */
+ public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, boolean manageTimeouts)
+ {
+ this.manageTimeouts = manageTimeouts;
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
@@ -100,6 +120,10 @@ public class CuratorZookeeperClient implements Closeable
this.connectionTimeoutMs = connectionTimeoutMs;
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
+ if ( !manageTimeouts )
+ {
+ state.disableTimeoutChecks();
+ }
setRetryPolicy(retryPolicy);
}
@@ -302,9 +326,15 @@ public class CuratorZookeeperClient implements Closeable
return state.getInstanceIndex();
}
+ /**
+ * Returns true if connection timeouts should cause the retry policy to be checked. If false
+ * is returned, throw a connection exception without retrying
+ *
+ * @return true/false
+ */
public boolean retryConnectionTimeouts()
{
- return true;
+ return manageTimeouts;
}
void addParentWatcher(Watcher watcher)
http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c359fdc..bcbeecd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -122,15 +122,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
},
builder.getRetryPolicy(),
- builder.canBeReadOnly()
- )
- {
- @Override
- public boolean retryConnectionTimeouts()
- {
- return !enableSessionExpiredState;
- }
- };
+ builder.canBeReadOnly(),
+ !builder.getEnableSessionExpiredState() // inverse is correct here. By default, CuratorZookeeperClient manages timeouts. The new SessionExpiredState needs this disabled.
+ );
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 553faac..52e0d07 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -295,7 +295,7 @@ public class ConnectionStateManager implements Closeable
long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
if ( elapsedMs >= sessionTimeoutMs )
{
- log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
+ log.warn(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
try
{
client.getZookeeperClient().reset();
http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index 150eb50..cd415b1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -123,7 +123,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
{
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
server.stop();
- Thread.sleep(timing.multiple(1.2).session());
+ timing.sleepForSession();
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
server.restart();
http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
index 7d2cb89..b90311b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.client;
+import com.google.common.collect.Queues;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
{
@@ -53,7 +56,6 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
{
server.stop();
- final StringBuilder listenerSequence = new StringBuilder();
LeaderSelector selector = null;
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
@@ -74,12 +76,13 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
selector.autoRequeue();
selector.start();
+ final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
ConnectionStateListener listener1 = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- listenerSequence.append("-").append(newState);
+ states.add(newState);
}
};
@@ -90,17 +93,21 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
log.debug("Stopping ZK server");
server.stop();
- timing.forWaiting().sleepABit();
+ timing.sleepForSession();
log.debug("Starting ZK server");
server.restart();
- timing.forWaiting().sleepABit();
+
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+ Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+ Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
log.debug("Stopping ZK server");
server.close();
- timing.forWaiting().sleepABit();
- Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+ Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+ Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
}
finally
{
http://git-wip-us.apache.org/repos/asf/curator/blob/847cc0d2/curator-test/src/main/java/org/apache/curator/test/Timing.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java
index 753d62d..fc4b314 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Timing.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java
@@ -35,6 +35,7 @@ public class Timing
private static final int DEFAULT_SECONDS = 10;
private static final int DEFAULT_WAITING_MULTIPLE = 5;
private static final double SESSION_MULTIPLE = 1.5;
+ private static final double SESSION_SLEEP_MULTIPLE = 1.75; // has to be at least session + 2/3 of a session to account for missed heartbeat then session expiration
/**
* Use the default base time
@@ -200,6 +201,26 @@ public class Timing
}
/**
+ * Sleep enough so that the session should expire
+ *
+ * @throws InterruptedException if interrupted
+ */
+ public void sleepForSession() throws InterruptedException
+ {
+ TimeUnit.MILLISECONDS.sleep(sessionSleep());
+ }
+
+ /**
+ * Return the value to sleep to ensure a ZK session timeout
+ *
+ * @return session sleep timeout
+ */
+ public int sessionSleep()
+ {
+ return multiple(SESSION_SLEEP_MULTIPLE).session();
+ }
+
+ /**
* Return the value to use for ZK session timeout
*
* @return session timeout