You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/22 00:36:04 UTC
[1/3] curator git commit: First pass at new (optional) definition of
state LOST
Repository: curator
Updated Branches:
refs/heads/CURATOR-247 [created] 62f3c33cd
First pass at new (optional) definition of state LOST
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/344634ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/344634ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/344634ac
Branch: refs/heads/CURATOR-247
Commit: 344634ac6e34e61bc0cc7b41923a1df4089c7948
Parents: 7d97259
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 12:10:24 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 12:10:24 2015 -0500
----------------------------------------------------------------------
.../framework/CuratorFrameworkFactory.java | 19 +++++
.../framework/api/UnhandledErrorListener.java | 4 +-
.../framework/imps/CuratorFrameworkImpl.java | 10 ++-
.../framework/state/ConnectionState.java | 35 +++++++--
.../framework/state/ConnectionStateManager.java | 75 +++++++++++++++-----
5 files changed, 113 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index dcb2ee6..6209b06 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.imps.GzipCompressionProvider;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.CreateMode;
@@ -116,6 +117,7 @@ public class CuratorFrameworkFactory
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
+ private boolean enableSessionExpiredState = false;
/**
* Apply the current values and build a new CuratorFramework
@@ -343,6 +345,18 @@ public class CuratorFrameworkFactory
return this;
}
+ /**
+ * Changes the meaning of {@link ConnectionState#LOST} from it's pre Curator 3.0.0 meaning
+ * to a true lost session state. See the {@link ConnectionState#LOST} doc for details.
+ *
+ * @return this
+ */
+ public Builder enableSessionExpiredState()
+ {
+ this.enableSessionExpiredState = true;
+ return this;
+ }
+
public ACLProvider getAclProvider()
{
return aclProvider;
@@ -398,6 +412,11 @@ public class CuratorFrameworkFactory
return useContainerParentsIfAvailable;
}
+ public boolean getEnableSessionExpiredState()
+ {
+ return enableSessionExpiredState;
+ }
+
@Deprecated
public String getAuthScheme()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
index b463af2..3721d4b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
@@ -24,9 +24,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
public interface UnhandledErrorListener
{
/**
- * Called when an exception is caught in a background thread, handler, etc. Before this
- * listener is called, the error will have been logged and a {@link ConnectionState#LOST}
- * event will have been queued for any {@link ConnectionStateListener}s.
+ * Called when an exception is caught in a background thread, handler, etc.
*
* @param message Source message
* @param e exception
http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 41bb7cd..c64fb8f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -83,6 +83,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final NamespaceFacadeCache namespaceFacadeCache;
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
private final boolean useContainerParentsIfAvailable;
+ private final boolean enableSessionExpiredState;
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -119,11 +120,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
- connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
+ connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getEnableSessionExpiredState(), builder.getSessionTimeoutMs());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
+ enableSessionExpiredState = builder.getEnableSessionExpiredState();
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -197,6 +199,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
state = parent.state;
authInfos = parent.authInfos;
useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+ enableSessionExpiredState = parent.enableSessionExpiredState;
}
@Override
@@ -722,7 +725,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
return;
}
- doSyncForSuspendedConnection(client.getInstanceIndex());
+ if ( !enableSessionExpiredState )
+ {
+ doSyncForSuspendedConnection(client.getInstanceIndex());
+ }
}
private void doSyncForSuspendedConnection(final long instanceIndex)
http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 3ca1d66..49d0044 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -19,6 +19,8 @@
package org.apache.curator.framework.state;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
/**
* Represents state changes in the connection to ZK
@@ -39,8 +41,7 @@ public enum ConnectionState
/**
* There has been a loss of connection. Leaders, locks, etc. should suspend
- * until the connection is re-established. If the connection times-out you will
- * receive a {@link #LOST} notice
+ * until the connection is re-established.
*/
SUSPENDED
{
@@ -62,9 +63,29 @@ public enum ConnectionState
},
/**
- * The connection is confirmed to be lost. Close any locks, leaders, etc. and
- * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
- * state after this but you should still consider any locks, etc. as dirty/unstable
+ * <p>
+ * NOTE: the meaning of this state depends on how your CuratorFramework instance
+ * is created.
+ * </p>
+ *
+ * <p>
+ * The default meaning of LOST (and the only meaning prior to Curator 3.0.0) is:
+ * The connection is confirmed to be lost (i.e. the retry policy has given up). Close any locks, leaders, etc. and
+ * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
+ * state after this but you should still consider any locks, etc. as dirty/unstable
+ * </p>
+ *
+ * <p>
+ * <strong>Since 3.0.0</strong>, you can alter the meaning of LOST by calling
+ * {@link CuratorFrameworkFactory.Builder#enableSessionExpiredState()}. In this mode,
+ * Curator will set the LOST state only when it believes that the ZooKeeper session
+ * has expired. ZooKeeper connections have a session. When the session expires, clients must take appropriate
+ * action. In Curator, this is complicated by the fact that Curator internally manages the ZooKeeper
+ * connection. In this mode, Curator will set the LOST state when any of the following occurs:
+ * a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED};
+ * b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout
+ * elapses during a network partition.
+ * </p>
*/
LOST
{
@@ -87,7 +108,9 @@ public enum ConnectionState
{
return true;
}
- };
+ }
+
+ ;
/**
* Check if this state indicates that Curator has a connection to ZooKeeper
http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 67ff13d..c0feb84 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -65,6 +66,8 @@ public class ConnectionStateManager implements Closeable
private final Logger log = LoggerFactory.getLogger(getClass());
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
private final CuratorFramework client;
+ private final boolean enableSessionExpiredState;
+ private final int sessionTimeoutMs;
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
private final ExecutorService service;
@@ -72,6 +75,8 @@ public class ConnectionStateManager implements Closeable
// guarded by sync
private ConnectionState currentConnectionState;
+ // guarded by sync
+ private long startOfSuspendedEpoch = 0;
private enum State
{
@@ -83,10 +88,14 @@ public class ConnectionStateManager implements Closeable
/**
* @param client the client
* @param threadFactory thread factory to use or null for a default
+ * @param enableSessionExpiredState if true, applies new meaning for LOST as described here: {@link ConnectionState#LOST}
+ * @param sessionTimeoutMs the ZK session timeout in milliseconds
*/
- public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory)
+ public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, boolean enableSessionExpiredState, int sessionTimeoutMs)
{
this.client = client;
+ this.enableSessionExpiredState = enableSessionExpiredState;
+ this.sessionTimeoutMs = sessionTimeoutMs;
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
@@ -137,7 +146,7 @@ public class ConnectionStateManager implements Closeable
/**
* Change to {@link ConnectionState#SUSPENDED} only if not already suspended and not lost
- *
+ *
* @return true if connection is set to SUSPENDED
*/
public synchronized boolean setToSuspended()
@@ -152,7 +161,7 @@ public class ConnectionStateManager implements Closeable
return false;
}
- currentConnectionState = ConnectionState.SUSPENDED;
+ setCurrentConnectionState(ConnectionState.SUSPENDED);
postState(ConnectionState.SUSPENDED);
return true;
@@ -177,7 +186,7 @@ public class ConnectionStateManager implements Closeable
{
return false;
}
- currentConnectionState = newConnectionState;
+ setCurrentConnectionState(newConnectionState);
ConnectionState localState = newConnectionState;
boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY));
@@ -242,25 +251,34 @@ public class ConnectionStateManager implements Closeable
{
while ( !Thread.currentThread().isInterrupted() )
{
- final ConnectionState newState = eventQueue.take();
-
- if ( listeners.size() == 0 )
+ final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+ if ( newState != null )
{
- log.warn("There are no ConnectionStateListeners registered.");
- }
+ if ( listeners.size() == 0 )
+ {
+ log.warn("There are no ConnectionStateListeners registered.");
+ }
- listeners.forEach
- (
- new Function<ConnectionStateListener, Void>()
- {
- @Override
- public Void apply(ConnectionStateListener listener)
+ listeners.forEach
+ (
+ new Function<ConnectionStateListener, Void>()
{
- listener.stateChanged(client, newState);
- return null;
+ @Override
+ public Void apply(ConnectionStateListener listener)
+ {
+ listener.stateChanged(client, newState);
+ return null;
+ }
}
- }
- );
+ );
+ }
+ else if ( enableSessionExpiredState )
+ {
+ synchronized(this)
+ {
+ checkSessionExpiration();
+ }
+ }
}
}
catch ( InterruptedException e )
@@ -268,4 +286,23 @@ public class ConnectionStateManager implements Closeable
Thread.currentThread().interrupt();
}
}
+
+ private void checkSessionExpiration()
+ {
+ if ( (currentConnectionState == ConnectionState.SUSPENDED) && (startOfSuspendedEpoch != 0) )
+ {
+ long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
+ if ( elapsedMs >= sessionTimeoutMs )
+ {
+ log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs));
+ addStateChange(ConnectionState.LOST);
+ }
+ }
+ }
+
+ private void setCurrentConnectionState(ConnectionState newConnectionState)
+ {
+ currentConnectionState = newConnectionState;
+ startOfSuspendedEpoch = (currentConnectionState == ConnectionState.SUSPENDED) ? System.currentTimeMillis() : 0;
+ }
}
[3/3] curator git commit: Continued work on new LOST behavior. Added
some tests. To get correct behavior it's necessary to not retry connection
failures. Retrying connection failures was never a good idea and here's a
good opportunity to fix it as this r
Posted by ra...@apache.org.
Continued work on new LOST behavior. Added some tests. To get correct behavior it's necessary to not retry connection failures. Retrying connection failures was never a good idea and here's a good opportunity to fix it as this requires client action to enable
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62f3c33c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62f3c33c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62f3c33c
Branch: refs/heads/CURATOR-247
Commit: 62f3c33cdb556eccf6fe1cc87ee74b3458431777
Parents: 2343daf
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 17:35:44 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 17:35:44 2015 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 24 ++---
.../apache/curator/CuratorZookeeperClient.java | 58 +++++++-----
.../main/java/org/apache/curator/RetryLoop.java | 12 +++
.../framework/CuratorFrameworkFactory.java | 2 +-
.../framework/imps/CuratorFrameworkImpl.java | 43 ++++++++-
.../framework/state/ConnectionState.java | 5 +
.../framework/state/ConnectionStateManager.java | 13 ++-
.../framework/imps/TestBlockUntilConnected.java | 1 +
.../imps/TestEnabledSessionExpiredState.java | 99 ++++++++++++++++++++
.../apache/curator/test/BaseClassForTests.java | 37 +++++++-
.../java/org/apache/curator/test/Timing.java | 2 +-
11 files changed, 253 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index d3900a1..1dfdbef 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -171,6 +171,18 @@ class ConnectionState implements Watcher, Closeable
return ensembleProvider;
}
+ synchronized void reset() throws Exception
+ {
+ log.debug("reset");
+
+ instanceIndex.incrementAndGet();
+
+ isConnected.set(false);
+ connectionStartMs = System.currentTimeMillis();
+ zooKeeper.closeAndReset();
+ zooKeeper.getZooKeeper(); // initiate connection
+ }
+
private synchronized void checkTimeouts() throws Exception
{
int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
@@ -206,18 +218,6 @@ class ConnectionState implements Watcher, Closeable
}
}
- private synchronized void reset() throws Exception
- {
- log.debug("reset");
-
- instanceIndex.incrementAndGet();
-
- isConnected.set(false);
- connectionStartMs = System.currentTimeMillis();
- zooKeeper.closeAndReset();
- zooKeeper.getZooKeeper(); // initiate connection
- }
-
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 09b28b2..fbb2f4c 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator;
import com.google.common.base.Preconditions;
@@ -43,12 +44,12 @@ import java.util.concurrent.atomic.AtomicReference;
@SuppressWarnings("UnusedDeclaration")
public class CuratorZookeeperClient implements Closeable
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final ConnectionState state;
- private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
- private final int connectionTimeoutMs;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ConnectionState state;
+ private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
+ private final int connectionTimeoutMs;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
/**
*
@@ -159,7 +160,7 @@ public class CuratorZookeeperClient implements Closeable
Preconditions.checkState(started.get(), "Client is not started");
log.debug("blockUntilConnectedOrTimedOut() start");
- TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut");
+ TimeTrace trace = startTracer("blockUntilConnectedOrTimedOut");
internalBlockUntilConnectedOrTimedOut();
@@ -176,7 +177,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @throws IOException errors
*/
- public void start() throws Exception
+ public void start() throws Exception
{
log.debug("Starting");
@@ -192,7 +193,7 @@ public class CuratorZookeeperClient implements Closeable
/**
* Close the client
*/
- public void close()
+ public void close()
{
log.debug("Closing");
@@ -212,7 +213,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @param policy new policy
*/
- public void setRetryPolicy(RetryPolicy policy)
+ public void setRetryPolicy(RetryPolicy policy)
{
Preconditions.checkNotNull(policy, "policy cannot be null");
@@ -234,7 +235,7 @@ public class CuratorZookeeperClient implements Closeable
* @param name name of the event
* @return the new tracer ({@link TimeTrace#commit()} must be called)
*/
- public TimeTrace startTracer(String name)
+ public TimeTrace startTracer(String name)
{
return new TimeTrace(name, tracer.get());
}
@@ -244,7 +245,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @return tracing driver
*/
- public TracerDriver getTracerDriver()
+ public TracerDriver getTracerDriver()
{
return tracer.get();
}
@@ -254,7 +255,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @param tracer new tracing driver
*/
- public void setTracerDriver(TracerDriver tracer)
+ public void setTracerDriver(TracerDriver tracer)
{
this.tracer.set(tracer);
}
@@ -265,7 +266,7 @@ public class CuratorZookeeperClient implements Closeable
*
* @return connection string
*/
- public String getCurrentConnectionString()
+ public String getCurrentConnectionString()
{
return state.getEnsembleProvider().getConnectionString();
}
@@ -281,6 +282,16 @@ public class CuratorZookeeperClient implements Closeable
}
/**
+ * For internal use only - reset the internally managed ZK handle
+ *
+ * @throws Exception errors
+ */
+ public void reset() throws Exception
+ {
+ state.reset();
+ }
+
+ /**
* Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
* is incremented.
*
@@ -291,22 +302,27 @@ public class CuratorZookeeperClient implements Closeable
return state.getInstanceIndex();
}
- void addParentWatcher(Watcher watcher)
+ public boolean retryConnectionTimeouts()
+ {
+ return true;
+ }
+
+ void addParentWatcher(Watcher watcher)
{
state.addParentWatcher(watcher);
}
- void removeParentWatcher(Watcher watcher)
+ void removeParentWatcher(Watcher watcher)
{
state.removeParentWatcher(watcher);
}
void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
{
- long waitTimeMs = connectionTimeoutMs;
+ long waitTimeMs = connectionTimeoutMs;
while ( !state.isConnected() && (waitTimeMs > 0) )
{
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
Watcher tempWatcher = new Watcher()
{
@Override
@@ -315,9 +331,9 @@ public class CuratorZookeeperClient implements Closeable
latch.countDown();
}
};
-
+
state.addParentWatcher(tempWatcher);
- long startTimeMs = System.currentTimeMillis();
+ long startTimeMs = System.currentTimeMillis();
try
{
latch.await(1, TimeUnit.SECONDS);
@@ -326,7 +342,7 @@ public class CuratorZookeeperClient implements Closeable
{
state.removeParentWatcher(tempWatcher);
}
- long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
+ long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
waitTimeMs -= elapsed;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-client/src/main/java/org/apache/curator/RetryLoop.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 065ebef..8d77cf7 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -98,11 +98,17 @@ public class RetryLoop
{
T result = null;
RetryLoop retryLoop = client.newRetryLoop();
+ boolean connectionFailed = false;
while ( retryLoop.shouldContinue() )
{
try
{
client.internalBlockUntilConnectedOrTimedOut();
+ if ( !client.isConnected() && !client.retryConnectionTimeouts() )
+ {
+ connectionFailed = true;
+ break;
+ }
result = proc.call();
retryLoop.markComplete();
@@ -112,6 +118,12 @@ public class RetryLoop
retryLoop.takeException(e);
}
}
+
+ if ( connectionFailed )
+ {
+ throw new KeeperException.ConnectionLossException();
+ }
+
return result;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 6209b06..fad4fc2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -117,7 +117,7 @@ public class CuratorFrameworkFactory
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
- private boolean enableSessionExpiredState = false;
+ private boolean enableSessionExpiredState = Boolean.getBoolean("curator-enable-session-expired-state");
/**
* Apply the current values and build a new CuratorFramework
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c64fb8f..c359fdc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -61,6 +61,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class CuratorFrameworkImpl implements CuratorFramework
@@ -84,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
private final boolean useContainerParentsIfAvailable;
private final boolean enableSessionExpiredState;
+ private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -104,15 +106,31 @@ public class CuratorFrameworkImpl implements CuratorFramework
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
- this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+ this.client = new CuratorZookeeperClient
+ (
+ localZookeeperFactory,
+ builder.getEnsembleProvider(),
+ builder.getSessionTimeoutMs(),
+ builder.getConnectionTimeoutMs(),
+ new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent watchedEvent)
+ {
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
+ processEvent(event);
+ }
+ },
+ builder.getRetryPolicy(),
+ builder.canBeReadOnly()
+ )
{
@Override
- public void process(WatchedEvent watchedEvent)
+ public boolean retryConnectionTimeouts()
{
- CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
- processEvent(event);
+ return !enableSessionExpiredState;
}
- }, builder.getRetryPolicy(), builder.canBeReadOnly());
+ };
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -675,14 +693,29 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
+ checkNewConnection();
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
+ checkNewConnection();
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
+ private void checkNewConnection()
+ {
+ if ( enableSessionExpiredState )
+ {
+ long instanceIndex = client.getInstanceIndex();
+ long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
+ if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this
+ {
+ connectionStateManager.addStateChange(ConnectionState.LOST);
+ }
+ }
+ }
+
Watcher.Event.KeeperState codeToState(KeeperException.Code code)
{
switch ( code )
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 49d0044..79f3b62 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -86,6 +86,11 @@ public enum ConnectionState
* b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout
* elapses during a network partition.
* </p>
+ *
+ * <p>
+ * NOTE: the new behavior for the LOST state can also be enabled via the command line
+ * property "curator-enable-session-expired-state" (e.g. -Dcurator-enable-session-expired-state=true)
+ * </p>
*/
LOST
{
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index c0feb84..553faac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -251,7 +251,8 @@ public class ConnectionStateManager implements Closeable
{
while ( !Thread.currentThread().isInterrupted() )
{
- final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+ int pollMaxMs = (sessionTimeoutMs * 2) / 3; // 2/3 of session timeout
+ final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
if ( newState != null )
{
if ( listeners.size() == 0 )
@@ -294,7 +295,15 @@ public class ConnectionStateManager implements Closeable
long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
if ( elapsedMs >= sessionTimeoutMs )
{
- log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs));
+ log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
+ try
+ {
+ client.getZookeeperClient().reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not reset the connection", e);
+ }
addStateChange(ConnectionState.LOST);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index f649afb..eeec797 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -114,6 +114,7 @@ public class TestBlockUntilConnected extends BaseClassForTests
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
+ sessionTimeoutMs(timing.session()).
retryPolicy(new RetryOneTime(1)).
build();
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
new file mode 100644
index 0000000..030a292
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -0,0 +1,99 @@
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TestEnabledSessionExpiredState extends BaseClassForTests
+{
+ private final Timing timing = new Timing();
+
+ private CuratorFramework client;
+ private BlockingQueue<ConnectionState> states;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception
+ {
+ super.setup();
+
+ client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(timing.connection())
+ .sessionTimeoutMs(timing.session())
+ .enableSessionExpiredState()
+ .retryPolicy(new RetryOneTime(1))
+ .build();
+ client.start();
+
+ states = Queues.newLinkedBlockingQueue();
+ ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ states.add(newState);
+ }
+ };
+ client.getConnectionStateListenable().addListener(listener);
+ }
+
+ @AfterMethod
+ @Override
+ public void teardown() throws Exception
+ {
+ CloseableUtils.closeQuietly(client);
+
+ super.teardown();
+ }
+
+ @Test
+ public void testKillSession() throws Exception
+ {
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+ }
+
+ @Test
+ public void testReconnectWithoutExpiration() throws Exception
+ {
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+ server.stop();
+ try
+ {
+ client.checkExists().forPath("/"); // any API call that will invoke the retry policy, etc.
+ }
+ catch ( KeeperException.ConnectionLossException ignore )
+ {
+ }
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+ server.restart();
+ client.checkExists().forPath("/");
+ Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+ }
+
+ @Override
+ protected boolean enabledSessionExpiredStateAware()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 6ef3bb0..c9f3524 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -16,10 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.IInvokedMethod;
+import org.testng.IInvokedMethodListener;
import org.testng.IRetryAnalyzer;
import org.testng.ITestContext;
+import org.testng.ITestNGListener;
import org.testng.ITestNGMethod;
import org.testng.ITestResult;
import org.testng.annotations.AfterMethod;
@@ -32,11 +38,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class BaseClassForTests
{
protected TestingServer server;
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private static final int RETRY_WAIT_MS = 5000;
+ private static final int RETRY_WAIT_MS = 5000;
private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND;
private static final String INTERNAL_RETRY_FAILED_TESTS;
+
static
{
String logConnectionIssues = null;
@@ -70,8 +78,30 @@ public class BaseClassForTests
@BeforeSuite(alwaysRun = true)
public void beforeSuite(ITestContext context)
{
+ if ( !enabledSessionExpiredStateAware() )
+ {
+ ITestNGListener listener = new IInvokedMethodListener()
+ {
+ @Override
+ public void beforeInvocation(IInvokedMethod method, ITestResult testResult)
+ {
+ int invocationCount = method.getTestMethod().getCurrentInvocationCount();
+ System.setProperty("curator-enable-session-expired-state", Boolean.toString(invocationCount == 1));
+ log.info("curator-enable-session-expired-state: " + Boolean.toString(invocationCount == 1));
+ }
+
+ @Override
+ public void afterInvocation(IInvokedMethod method, ITestResult testResult)
+ {
+ System.clearProperty("curator-enable-session-expired-state");
+ }
+ };
+ context.getSuite().addListener(listener);
+ }
+
for ( ITestNGMethod method : context.getAllTestMethods() )
{
+ method.setInvocationCount(enabledSessionExpiredStateAware() ? 1 : 2);
method.setRetryAnalyzer(new RetryTest());
}
}
@@ -117,6 +147,11 @@ public class BaseClassForTests
}
}
+ protected boolean enabledSessionExpiredStateAware()
+ {
+ return false;
+ }
+
private static class RetryTest implements IRetryAnalyzer
{
private final AtomicBoolean hasBeenRetried = new AtomicBoolean(!Boolean.getBoolean(INTERNAL_RETRY_FAILED_TESTS));
http://git-wip-us.apache.org/repos/asf/curator/blob/62f3c33c/curator-test/src/main/java/org/apache/curator/test/Timing.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/Timing.java b/curator-test/src/main/java/org/apache/curator/test/Timing.java
index f29b1c5..753d62d 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Timing.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Timing.java
@@ -34,7 +34,7 @@ public class Timing
private static final int DEFAULT_SECONDS = 10;
private static final int DEFAULT_WAITING_MULTIPLE = 5;
- private static final double SESSION_MULTIPLE = .25;
+ private static final double SESSION_MULTIPLE = 1.5;
/**
* Use the default base time
[2/3] curator git commit: Merge branch 'CURATOR-3.0' into CURATOR-247
Posted by ra...@apache.org.
Merge branch 'CURATOR-3.0' into CURATOR-247
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2343daf2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2343daf2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2343daf2
Branch: refs/heads/CURATOR-247
Commit: 2343daf29388566b0efa0b0a2ad21574fb534a27
Parents: 344634a 2fc9e37
Author: randgalt <ra...@apache.org>
Authored: Fri Aug 21 15:11:59 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Aug 21 15:11:59 2015 -0500
----------------------------------------------------------------------
.../recipes/queue/DistributedIdQueue.java | 28 +++++++++++++++++++-
.../recipes/queue/DistributedQueue.java | 7 ++++-
.../recipes/queue/TestDistributedIdQueue.java | 2 +-
.../recipes/queue/TestDistributedQueue.java | 2 +-
4 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------