You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/22 00:36:06 UTC
[3/3] curator git commit: Continued work on new LOST behavior. Added
some tests. To get correct behavior it's necessary to not retry connection
failures. Retrying connection failures was never a good idea and here's a
good opportunity to fix it as this r
Continued work on new LOST behavior. Added some tests. To get correct behavior it's necessary to not retry connection failures. Retrying connection failures was never a good idea and here's a good opportunity to fix it as this requires client action to enable
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62f3c33c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62f3c33c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62f3c33c
Branch: refs/heads/CURATOR-247
Commit: 62f3c33cdb556eccf6fe1cc87ee74b3458431777
Parents: 2343daf
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 17:35:44 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 17:35:44 2015 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 24 ++---
.../apache/curator/CuratorZookeeperClient.java | 58 +++++++-----
.../main/java/org/apache/curator/RetryLoop.java | 12 +++
.../framework/CuratorFrameworkFactory.java | 2 +-
.../framework/imps/CuratorFrameworkImpl.java | 43 ++++++++-
.../framework/state/ConnectionState.java | 5 +
.../framework/state/ConnectionStateManager.java | 13 ++-
.../framework/imps/TestBlockUntilConnected.java | 1 +
.../imps/TestEnabledSessionExpiredState.java | 99 ++++++++++++++++++++
.../apache/curator/test/BaseClassForTests.java | 37 +++++++-
.../java/org/apache/curator/test/Timing.java | 2 +-
11 files changed, 253 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index d3900a1..1dfdbef 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -171,6 +171,18 @@ class ConnectionState implements Watcher, Closeable
return ensembleProvider;
}
+ synchronized void reset() throws Exception
+ {
+ log.debug("reset");
+
+ instanceIndex.incrementAndGet();
+
+ isConnected.set(false);
+ connectionStartMs = System.currentTimeMillis();
+ zooKeeper.closeAndReset();
+ zooKeeper.getZooKeeper(); // initiate connection
+ }
+
private synchronized void checkTimeouts() throws Exception
{
int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
@@ -206,18 +218,6 @@ class ConnectionState implements Watcher, Closeable
}
}
- private synchronized void reset() throws Exception
- {
- log.debug("reset");
-
- instanceIndex.incrementAndGet();
-
- isConnected.set(false);
- connectionStartMs = System.currentTimeMillis();
- zooKeeper.closeAndReset();
- zooKeeper.getZooKeeper(); // initiate connection
- }
-
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 09b28b2..fbb2f4c 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator;
import com.google.common.base.Preconditions;
@@ -43,12 +44,12 @@ import java.util.concurrent.atomic.AtomicReference;
@SuppressWarnings("UnusedDeclaration")
public class CuratorZookeeperClient implements Closeable
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final ConnectionState state;
- private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
- private final int connectionTimeoutMs;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ConnectionState state;
+ private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
+ private final int connectionTimeoutMs;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
/**
*
@@ -159,7 +160,7 @@ public class CuratorZookeeperClient implements Closeable
Preconditions.checkState(started.get(), "Client is not started");
log.debug("blockUntilConnectedOrTimedOut() start");
- TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut");
+ TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut");
internalBlockUntilConnectedOrTimedOut();
@@ -176,7 +177,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @throws IOException errors
*/
- public void start() throws Exception
+ public void start() throws Exception
{
log.debug("Starting");
@@ -192,7 +193,7 @@ public class CuratorZookeeperClient implements Closeable
/**
* Close the client
*/
- public void close()
+ public void close()
{
log.debug("Closing");
@@ -212,7 +213,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @param policy new policy
*/
- public void setRetryPolicy(RetryPolicy policy)
+ public void setRetryPolicy(RetryPolicy policy)
{
Preconditions.checkNotNull(policy, "policy cannot be null");
@@ -234,7 +235,7 @@ public class CuratorZookeeperClient implements Closeable
* @param name name of the event
* @return the new tracer ({@link TimeTrace#commit()} must be called)
*/
- public TimeTrace startTracer(String name)
+ public TimeTrace startTracer(String name)
{
return new TimeTrace(name, tracer.get());
}
@@ -244,7 +245,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @return tracing driver
*/
- public TracerDriver getTracerDriver()
+ public TracerDriver getTracerDriver()
{
return tracer.get();
}
@@ -254,7 +255,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @param tracer new tracing driver
*/
- public void setTracerDriver(TracerDriver tracer)
+ public void setTracerDriver(TracerDriver tracer)
{
this.tracer.set(tracer);
}
@@ -265,7 +266,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @return connection string
*/
- public String getCurrentConnectionString()
+ public String getCurrentConnectionString()
{
return state.getEnsembleProvider().getConnectionString();
}
@@ -281,6 +282,16 @@ public class CuratorZookeeperClient implements Closeable
}
/**
+ * For internal use only - reset the internally managed ZK handle
+ *
+ * @throws Exception errors
+ */
+ public void reset() throws Exception
+ {
+ state.reset();
+ }
+
+ /**
* Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
* is incremented.
*
@@ -291,22 +302,27 @@ public class CuratorZookeeperClient implements Closeable
return state.getInstanceIndex();
}
- void addParentWatcher(Watcher watcher)
+ public boolean retryConnectionTimeouts()
+ {
+ return true;
+ }
+
+ void addParentWatcher(Watcher watcher)
{
state.addParentWatcher(watcher);
}
- void removeParentWatcher(Watcher watcher)
+ void removeParentWatcher(Watcher watcher)
{
state.removeParentWatcher(watcher);
}
void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
{
- long waitTimeMs = connectionTimeoutMs;
+ long waitTimeMs = connectionTimeoutMs;
while ( !state.isConnected() && (waitTimeMs > 0) )
{
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
Watcher tempWatcher = new Watcher()
{
@Override
@@ -315,9 +331,9 @@ public class CuratorZookeeperClient implements Closeable
latch.countDown();
}
};
-
+
state.addParentWatcher(tempWatcher);
- long startTimeMs = System.currentTimeMillis();
+ long startTimeMs = System.currentTimeMillis();
try
{
latch.await(1, TimeUnit.SECONDS);
@@ -326,7 +342,7 @@ public class CuratorZookeeperClient implements Closeable
{
state.removeParentWatcher(tempWatcher);
}
- long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
+ long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
waitTimeMs -= elapsed;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/RetryLoop.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 065ebef..8d77cf7 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -98,11 +98,17 @@ public class RetryLoop
{
T result = null;
RetryLoop retryLoop = client.newRetryLoop();
+ boolean connectionFailed = false;
while ( retryLoop.shouldContinue() )
{
try
{
client.internalBlockUntilConnectedOrTimedOut();
+ if ( !client.isConnected() && !client.retryConnectionTimeouts() )
+ {
+ connectionFailed = true;
+ break;
+ }
result = proc.call();
retryLoop.markComplete();
@@ -112,6 +118,12 @@ public class RetryLoop
retryLoop.takeException(e);
}
}
+
+ if ( connectionFailed )
+ {
+ throw new KeeperException.ConnectionLossException();
+ }
+
return result;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 6209b06..fad4fc2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -117,7 +117,7 @@ public class CuratorFrameworkFactory
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
- private boolean enableSessionExpiredState = false;
+ private boolean enableSessionExpiredState = Boolean.getBoolean("curator-enable-session-expired-state");
/**
* Apply the current values and build a new CuratorFramework
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c64fb8f..c359fdc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -61,6 +61,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class CuratorFrameworkImpl implements CuratorFramework
@@ -84,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
private final boolean useContainerParentsIfAvailable;
private final boolean enableSessionExpiredState;
+ private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -104,15 +106,31 @@ public class CuratorFrameworkImpl implements CuratorFramework
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
- this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+ this.client = new CuratorZookeeperClient
+ (
+ localZookeeperFactory,
+ builder.getEnsembleProvider(),
+ builder.getSessionTimeoutMs(),
+ builder.getConnectionTimeoutMs(),
+ new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent watchedEvent)
+ {
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
+ processEvent(event);
+ }
+ },
+ builder.getRetryPolicy(),
+ builder.canBeReadOnly()
+ )
{
@Override
- public void process(WatchedEvent watchedEvent)
+ public boolean retryConnectionTimeouts()
{
- CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
- processEvent(event);
+ return !enableSessionExpiredState;
}
- }, builder.getRetryPolicy(), builder.canBeReadOnly());
+ };
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -675,14 +693,29 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
+ checkNewConnection();
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
+ checkNewConnection();
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
+ private void checkNewConnection()
+ {
+ if ( enableSessionExpiredState )
+ {
+ long instanceIndex = client.getInstanceIndex();
+ long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
+ if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this
+ {
+ connectionStateManager.addStateChange(ConnectionState.LOST);
+ }
+ }
+ }
+
Watcher.Event.KeeperState codeToState(KeeperException.Code code)
{
switch ( code )
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 49d0044..79f3b62 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -86,6 +86,11 @@ public enum ConnectionState
* b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout
* elapses during a network partition.
* </p>
+ *
+ * <p>
+ * NOTE: the new behavior for the LOST state can also be enabled via the command line
+ * property "curator-enable-session-expired-state" (e.g. -Dcurator-enable-session-expired-state=true)
+ * </p>
*/
LOST
{
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index c0feb84..553faac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -251,7 +251,8 @@ public class ConnectionStateManager implements Closeable
{
while ( !Thread.currentThread().isInterrupted() )
{
- final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+ int pollMaxMs = (sessionTimeoutMs * 2) / 3; // 2/3 of session timeout
+ final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
if ( newState != null )
{
if ( listeners.size() == 0 )
@@ -294,7 +295,15 @@ public class ConnectionStateManager implements Closeable
long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
if ( elapsedMs >= sessionTimeoutMs )
{
- log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs));
+ log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
+ try
+ {
+ client.getZookeeperClient().reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not reset the connection", e);
+ }
addStateChange(ConnectionState.LOST);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index f649afb..eeec797 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -114,6 +114,7 @@ public class TestBlockUntilConnected extends BaseClassForTests
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
+ sessionTimeoutMs(timing.session()).
retryPolicy(new RetryOneTime(1)).
build();
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
new file mode 100644
index 0000000..030a292
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -0,0 +1,99 @@
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TestEnabledSessionExpiredState extends BaseClassForTests
+{
+ private final Timing timing = new Timing();
+
+ private CuratorFramework client;
+ private BlockingQueue<ConnectionState> states;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception
+ {
+ super.setup();
+
+ client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(timing.connection())
+ .sessionTimeoutMs(timing.session())
+ .enableSessionExpiredState()
+ .retryPolicy(new RetryOneTime(1))
+ .build();
+ client.start();
+
+ states = Queues.newLinkedBlockingQueue();
+ ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ states.add(newState);
+ }
+ };
+ client.getConnectionStateListenable().addListener(listener);
+ }
+
+ @AfterMethod
+ @Override
+ public void teardown() throws Exception
+ {
+ CloseableUtils.closeQuietly(client);
+
+ super.teardown();
+ }
+
+ @Test
+ public void testKillSession() throws Exception
+ {
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+ }
+
+ @Test
+ public void testReconnectWithoutExpiration() throws Exception
+ {
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+ server.stop();
+ try
+ {
+ client.checkExists().forPath("/"); // any API call that will invoke the retry policy, etc.
+ }
+ catch ( KeeperException.ConnectionLossException ignore )
+ {
+ }
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+ server.restart();
+ client.checkExists().forPath("/");
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+ }
+
+ @Override
+ protected boolean enabledSessionExpiredStateAware()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 6ef3bb0..c9f3524 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -16,10 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.IInvokedMethod;
+import org.testng.IInvokedMethodListener;
import org.testng.IRetryAnalyzer;
import org.testng.ITestContext;
+import org.testng.ITestNGListener;
import org.testng.ITestNGMethod;
import org.testng.ITestResult;
import org.testng.annotations.AfterMethod;
@@ -32,11 +38,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class BaseClassForTests
{
protected TestingServer server;
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private static final int RETRY_WAIT_MS = 5000;
+ private static final int RETRY_WAIT_MS = 5000;
private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND;
private static final String INTERNAL_RETRY_FAILED_TESTS;
+
static
{
String logConnectionIssues = null;
@@ -70,8 +78,30 @@ public class BaseClassForTests
@BeforeSuite(alwaysRun = true)
public void beforeSuite(ITestContext context)
{
+ if ( !enabledSessionExpiredStateAware() )
+ {
+ ITestNGListener listener = new IInvokedMethodListener()
+ {
+ @Override
+ public void beforeInvocation(IInvokedMethod method, ITestResult testResult)
+ {
+ int invocationCount = method.getTestMethod().getCurrentInvocationCount();
+ System.setProperty("curator-enable-session-expired-state", Boolean.toString(invocationCount == 1));
+ log.info("curator-enable-session-expired-state: " + Boolean.toString(invocationCount == 1));
+ }
+
+ @Override
+ public void afterInvocation(IInvokedMethod method, ITestResult testResult)
+ {
+ System.clearProperty("curator-enable-session-expired-state");
+ }
+ };
+ context.getSuite().addListener(listener);
+ }
+
for ( ITestNGMethod method : context.getAllTestMethods() )
{
+ method.setInvocationCount(enabledSessionExpiredStateAware() ? 1 : 2);
method.setRetryAnalyzer(new RetryTest());
}
}
@@ -117,6 +147,11 @@ public class BaseClassForTests
}
}
+ protected boolean enabledSessionExpiredStateAware()
+ {
+ return false;
+ }
+
private static class RetryTest implements IRetryAnalyzer
{
private final AtomicBoolean hasBeenRetried = new AtomicBoolean(!Boolean.getBoolean(INTERNAL_RETRY_FAILED_TESTS));
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/Timing.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java
index f29b1c5..753d62d 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Timing.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java
@@ -34,7 +34,7 @@ public class Timing
private static final int DEFAULT_SECONDS = 10;
private static final int DEFAULT_WAITING_MULTIPLE = 5;
- private static final double SESSION_MULTIPLE = .25;
+ private static final double SESSION_MULTIPLE = 1.5;
/**
* Use the default base time