You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/05/10 02:18:42 UTC
git commit: CURATOR-24 Improve the handling of hung ZooKeeper
connections
Updated Branches:
refs/heads/CURATOR-24 [created] 11ae23adc
CURATOR-24
Improve the handling of hung ZooKeeper connections
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/11ae23ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/11ae23ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/11ae23ad
Branch: refs/heads/CURATOR-24
Commit: 11ae23adc013fa6526ac001bce2f5b2967a1d9b5
Parents: 38f28b5
Author: randgalt <ra...@apache.org>
Authored: Thu May 9 17:18:09 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Thu May 9 17:18:09 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/curator/ConnectionState.java | 169 +++++++--------
.../org/apache/curator/CuratorZookeeperClient.java | 9 -
.../framework/state/ConnectionStateManager.java | 5 -
3 files changed, 84 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 8de3e27..bbb0588 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -38,24 +38,23 @@ import java.util.concurrent.atomic.AtomicReference;
class ConnectionState implements Watcher, Closeable
{
- private volatile long connectionStartMs = 0;
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final HandleHolder zooKeeper;
- private final AtomicBoolean isConnected = new AtomicBoolean(false);
- private final AtomicBoolean lost = new AtomicBoolean(false);
- private final EnsembleProvider ensembleProvider;
- private final int connectionTimeoutMs;
+ private static final int MAX_BACKGROUND_EXCEPTIONS = 10;
+ private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final HandleHolder zooKeeper;
+ private final AtomicBoolean isConnected = new AtomicBoolean(false);
+ private final EnsembleProvider ensembleProvider;
+ private final int sessionTimeoutMs;
+ private final int connectionTimeoutMs;
private final AtomicReference<TracerDriver> tracer;
- private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
- private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
-
- private static final int MAX_BACKGROUND_EXCEPTIONS = 10;
- private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
+ private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
+ private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
+ private volatile long connectionStartMs = 0;
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
+ this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
if ( parentWatcher != null )
@@ -73,12 +72,6 @@ class ConnectionState implements Watcher, Closeable
throw new SessionFailRetryLoop.SessionFailedException();
}
- if ( lost.compareAndSet(true, false) )
- {
- log.info("resetting after loss");
- reset();
- }
-
Exception exception = backgroundExceptions.poll();
if ( exception != null )
{
@@ -90,24 +83,7 @@ class ConnectionState implements Watcher, Closeable
boolean localIsConnected = isConnected.get();
if ( !localIsConnected )
{
- long elapsed = System.currentTimeMillis() - connectionStartMs;
- if ( elapsed >= connectionTimeoutMs )
- {
- if ( zooKeeper.hasNewConnectionString() )
- {
- handleNewConnectionString();
- }
- else
- {
- KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException();
- if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
- {
- log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
- }
- tracer.get().addCount("connections-timed-out", 1);
- throw connectionLossException;
- }
- }
+ checkTimeouts();
}
return zooKeeper.getZooKeeper();
@@ -118,7 +94,7 @@ class ConnectionState implements Watcher, Closeable
return isConnected.get();
}
- void start() throws Exception
+ void start() throws Exception
{
log.debug("Starting");
ensembleProvider.start();
@@ -126,7 +102,7 @@ class ConnectionState implements Watcher, Closeable
}
@Override
- public void close() throws IOException
+ public void close() throws IOException
{
log.debug("Closing");
@@ -142,27 +118,19 @@ class ConnectionState implements Watcher, Closeable
finally
{
isConnected.set(false);
- lost.set(false);
}
}
- void addParentWatcher(Watcher watcher)
+ void addParentWatcher(Watcher watcher)
{
parentWatchers.offer(watcher);
}
- void removeParentWatcher(Watcher watcher)
+ void removeParentWatcher(Watcher watcher)
{
parentWatchers.remove(watcher);
}
- void markLost()
- {
- log.info("lost marked");
-
- lost.set(true);
- }
-
@Override
public void process(WatchedEvent event)
{
@@ -188,10 +156,6 @@ class ConnectionState implements Watcher, Closeable
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
- if ( newIsConnected )
- {
- lost.set(false);
- }
connectionStartMs = System.currentTimeMillis();
}
}
@@ -201,6 +165,41 @@ class ConnectionState implements Watcher, Closeable
return ensembleProvider;
}
+ private synchronized void checkTimeouts() throws Exception
+ {
+ int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
+ long elapsed = System.currentTimeMillis() - connectionStartMs;
+ if ( elapsed >= minTimeout )
+ {
+ if ( zooKeeper.hasNewConnectionString() )
+ {
+ handleNewConnectionString();
+ }
+ else
+ {
+ int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
+ if ( elapsed > maxTimeout )
+ {
+ if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+ {
+ log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout));
+ }
+ reset();
+ }
+ else
+ {
+ KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException();
+ if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+ {
+ log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
+ }
+ tracer.get().addCount("connections-timed-out", 1);
+ throw connectionLossException;
+ }
+ }
+ }
+ }
+
private synchronized void reset() throws Exception
{
log.debug("reset");
@@ -213,44 +212,44 @@ class ConnectionState implements Watcher, Closeable
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
- boolean isConnected = wasConnected;
- boolean checkNewConnectionString = true;
+ boolean isConnected = wasConnected;
+ boolean checkNewConnectionString = true;
switch ( state )
{
- default:
- case Disconnected:
- {
- isConnected = false;
- break;
- }
+ default:
+ case Disconnected:
+ {
+ isConnected = false;
+ break;
+ }
- case SyncConnected:
- case ConnectedReadOnly:
- {
- isConnected = true;
- break;
- }
+ case SyncConnected:
+ case ConnectedReadOnly:
+ {
+ isConnected = true;
+ break;
+ }
- case AuthFailed:
- {
- isConnected = false;
- log.error("Authentication failed");
- break;
- }
+ case AuthFailed:
+ {
+ isConnected = false;
+ log.error("Authentication failed");
+ break;
+ }
- case Expired:
- {
- isConnected = false;
- checkNewConnectionString = false;
- handleExpiredSession();
- break;
- }
+ case Expired:
+ {
+ isConnected = false;
+ checkNewConnectionString = false;
+ handleExpiredSession();
+ break;
+ }
- case SaslAuthenticated:
- {
- // NOP
- break;
- }
+ case SaslAuthenticated:
+ {
+ // NOP
+ break;
+ }
}
if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index be63a9b..f633fe1 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -184,15 +184,6 @@ public class CuratorZookeeperClient implements Closeable
}
/**
- * Mark the connection as lost. The next time {@link #getZooKeeper()} is called,
- * a new connection will be created.
- */
- public void markLost()
- {
- state.markLost();
- }
-
- /**
* Close the client
*/
public void close()
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 8f5d9ff..2ed60c1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -141,11 +141,6 @@ public class ConnectionStateManager implements Closeable
return;
}
- if ( newState == ConnectionState.LOST )
- {
- client.getZookeeperClient().markLost();
- }
-
ConnectionState previousState = currentState.getAndSet(newState);
if ( previousState == newState )
{