You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/23 02:29:41 UTC
curator git commit: further refactoring. Abstracted old
framework-level connection handling into ClassicInternalConnectionHandler.
Probably more to do here
Repository: curator
Updated Branches:
refs/heads/CURATOR-247 e23913701 -> 30bd7b655
further refactoring. Abstracted old framework-level connection handling into ClassicInternalConnectionHandler. Probably more to do here
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/30bd7b65
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/30bd7b65
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/30bd7b65
Branch: refs/heads/CURATOR-247
Commit: 30bd7b655d201762d8ff74062964621879ac7134
Parents: e239137
Author: randgalt <ra...@apache.org>
Authored: Sat Aug 22 19:29:36 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Aug 22 19:29:36 2015 -0500
----------------------------------------------------------------------
.../imps/ClassicInternalConnectionHandler.java | 58 ++++++++++++++++++
.../framework/imps/CuratorFrameworkImpl.java | 64 ++++++--------------
.../imps/InternalConnectionHandler.java | 10 +++
.../imps/StandardInternalConnectionHandler.java | 22 +++++++
.../framework/state/ConnectionStateManager.java | 8 ++-
5 files changed, 112 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
new file mode 100644
index 0000000..1de6e80
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
@@ -0,0 +1,58 @@
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.state.ConnectionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ClassicInternalConnectionHandler implements InternalConnectionHandler
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public void checkNewConnection(CuratorFrameworkImpl client)
+ {
+ // NOP
+ }
+
+ @Override
+ public boolean checkSessionExpirationEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public void suspendConnection(CuratorFrameworkImpl client)
+ {
+ if ( client.setToSuspended() )
+ {
+ doSyncForSuspendedConnection(client, client.getZookeeperClient().getInstanceIndex());
+ }
+ }
+
+ private void doSyncForSuspendedConnection(final CuratorFrameworkImpl client, final long instanceIndex)
+ {
+ // we appear to have disconnected, force a new ZK event and see if we can connect to another server
+ final BackgroundOperation<String> operation = new BackgroundSyncImpl(client, null);
+ OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
+ {
+ @Override
+ public void retriesExhausted(OperationAndData<String> operationAndData)
+ {
+ // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
+ // so the pending background sync is no longer valid.
+ // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost
+ if ( (instanceIndex < 0) || (instanceIndex == client.getZookeeperClient().getInstanceIndex()) )
+ {
+ client.addStateChange(ConnectionState.LOST);
+ }
+ else
+ {
+ log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
+ // send -1 to signal that if it happens again, punt and mark the connection lost
+ doSyncForSuspendedConnection(client, -1);
+ }
+ }
+ };
+ client.performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 44a8ec6..b04987d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -85,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
private final boolean useContainerParentsIfAvailable;
private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
+ private final InternalConnectionHandler internalConnectionHandler;
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -125,13 +126,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
builder.getConnectionHandlingPolicy()
);
+ internalConnectionHandler = builder.getConnectionHandlingPolicy().isEmulatingClassicHandling() ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
- connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs());
+ connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), internalConnectionHandler.checkSessionExpirationEnabled());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -209,6 +211,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
state = parent.state;
authInfos = parent.authInfos;
useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+ internalConnectionHandler = parent.internalConnectionHandler;
}
@Override
@@ -676,7 +679,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
- suspendConnection();
+ internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
@@ -684,26 +687,23 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
- checkNewConnection();
+ internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
- checkNewConnection();
+ internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
- private void checkNewConnection()
+ void checkInstanceIndex()
{
- if ( !client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
+ long instanceIndex = client.getInstanceIndex();
+ long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
+ if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this
{
- long instanceIndex = client.getInstanceIndex();
- long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
- if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this
- {
- connectionStateManager.addStateChange(ConnectionState.LOST);
- }
+ connectionStateManager.addStateChange(ConnectionState.LOST);
}
}
@@ -742,44 +742,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
return null;
}
- private void suspendConnection()
+ boolean setToSuspended()
{
- if ( !connectionStateManager.setToSuspended() )
- {
- return;
- }
-
- if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
- {
- doSyncForSuspendedConnection(client.getInstanceIndex());
- }
+ return connectionStateManager.setToSuspended();
}
- private void doSyncForSuspendedConnection(final long instanceIndex)
+ void addStateChange(ConnectionState newConnectionState)
{
- // we appear to have disconnected, force a new ZK event and see if we can connect to another server
- final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
- OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
- {
- @Override
- public void retriesExhausted(OperationAndData<String> operationAndData)
- {
- // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
- // so the pending background sync is no longer valid.
- // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost
- if ( (instanceIndex < 0) || (instanceIndex == client.getInstanceIndex()) )
- {
- connectionStateManager.addStateChange(ConnectionState.LOST);
- }
- else
- {
- log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
- // send -1 to signal that if it happens again, punt and mark the connection lost
- doSyncForSuspendedConnection(-1);
- }
- }
- };
- performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));
+ connectionStateManager.addStateChange(newConnectionState);
}
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
@@ -894,7 +864,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- private void performBackgroundOperation(OperationAndData<?> operationAndData)
+ void performBackgroundOperation(OperationAndData<?> operationAndData)
{
try
{
http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
new file mode 100644
index 0000000..e9798d7
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
@@ -0,0 +1,10 @@
+package org.apache.curator.framework.imps;
+
+interface InternalConnectionHandler
+{
+ void checkNewConnection(CuratorFrameworkImpl client);
+
+ void suspendConnection(CuratorFrameworkImpl client);
+
+ boolean checkSessionExpirationEnabled();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
new file mode 100644
index 0000000..b0452c6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
@@ -0,0 +1,22 @@
+package org.apache.curator.framework.imps;
+
+class StandardInternalConnectionHandler implements InternalConnectionHandler
+{
+ @Override
+ public void suspendConnection(CuratorFrameworkImpl client)
+ {
+ client.setToSuspended();
+ }
+
+ @Override
+ public boolean checkSessionExpirationEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public void checkNewConnection(CuratorFrameworkImpl client)
+ {
+ client.checkInstanceIndex();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 2e7492f..406099d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -21,7 +21,6 @@ package org.apache.curator.framework.state;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import org.apache.curator.connection.ConnectionHandlingPolicyStyle;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.utils.ThreadUtils;
@@ -67,6 +66,7 @@ public class ConnectionStateManager implements Closeable
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
private final CuratorFramework client;
private final int sessionTimeoutMs;
+ private final boolean checkSessionExpiration;
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
private final ExecutorService service;
@@ -88,11 +88,13 @@ public class ConnectionStateManager implements Closeable
* @param client the client
* @param threadFactory thread factory to use or null for a default
* @param sessionTimeoutMs the ZK session timeout in milliseconds
+ * @param checkSessionExpiration if true, check for session timeouts, etc. ala new connection handling method
*/
- public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs)
+ public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, boolean checkSessionExpiration)
{
this.client = client;
this.sessionTimeoutMs = sessionTimeoutMs;
+ this.checkSessionExpiration = checkSessionExpiration;
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
@@ -270,7 +272,7 @@ public class ConnectionStateManager implements Closeable
}
);
}
- else if ( !client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling() )
+ else if ( checkSessionExpiration )
{
synchronized(this)
{