You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/23 02:29:41 UTC

curator git commit: further refactoring. Abstracted old framework-level connection handling into ClassicInternalConnectionHandler. Probably more to do here

Repository: curator
Updated Branches:
  refs/heads/CURATOR-247 e23913701 -> 30bd7b655


further refactoring. Abstracted old framework-level connection handling into ClassicInternalConnectionHandler. Probably more to do here


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/30bd7b65
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/30bd7b65
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/30bd7b65

Branch: refs/heads/CURATOR-247
Commit: 30bd7b655d201762d8ff74062964621879ac7134
Parents: e239137
Author: randgalt <ra...@apache.org>
Authored: Sat Aug 22 19:29:36 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Aug 22 19:29:36 2015 -0500

----------------------------------------------------------------------
 .../imps/ClassicInternalConnectionHandler.java  | 58 ++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java    | 64 ++++++--------------
 .../imps/InternalConnectionHandler.java         | 10 +++
 .../imps/StandardInternalConnectionHandler.java | 22 +++++++
 .../framework/state/ConnectionStateManager.java |  8 ++-
 5 files changed, 112 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
new file mode 100644
index 0000000..1de6e80
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
@@ -0,0 +1,58 @@
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.state.ConnectionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ClassicInternalConnectionHandler implements InternalConnectionHandler
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public void checkNewConnection(CuratorFrameworkImpl client)
+    {
+        // NOP
+    }
+
+    @Override
+    public boolean checkSessionExpirationEnabled()
+    {
+        return false;
+    }
+
+    @Override
+    public void suspendConnection(CuratorFrameworkImpl client)
+    {
+        if ( client.setToSuspended() )
+        {
+            doSyncForSuspendedConnection(client, client.getZookeeperClient().getInstanceIndex());
+        }
+    }
+
+    private void doSyncForSuspendedConnection(final CuratorFrameworkImpl client, final long instanceIndex)
+    {
+        // we appear to have disconnected, force a new ZK event and see if we can connect to another server
+        final BackgroundOperation<String> operation = new BackgroundSyncImpl(client, null);
+        OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
+        {
+            @Override
+            public void retriesExhausted(OperationAndData<String> operationAndData)
+            {
+                // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
+                // so the pending background sync is no longer valid.
+                // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost
+                if ( (instanceIndex < 0) || (instanceIndex == client.getZookeeperClient().getInstanceIndex()) )
+                {
+                    client.addStateChange(ConnectionState.LOST);
+                }
+                else
+                {
+                    log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
+                    // send -1 to signal that if it happens again, punt and mark the connection lost
+                    doSyncForSuspendedConnection(client, -1);
+                }
+            }
+        };
+        client.performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 44a8ec6..b04987d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -85,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
     private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
+    private final InternalConnectionHandler internalConnectionHandler;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -125,13 +126,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 builder.getConnectionHandlingPolicy()
             );
 
+        internalConnectionHandler = builder.getConnectionHandlingPolicy().isEmulatingClassicHandling() ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs());
+        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), internalConnectionHandler.checkSessionExpirationEnabled());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -209,6 +211,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         state = parent.state;
         authInfos = parent.authInfos;
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+        internalConnectionHandler = parent.internalConnectionHandler;
     }
 
     @Override
@@ -676,7 +679,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         if ( state == Watcher.Event.KeeperState.Disconnected )
         {
-            suspendConnection();
+            internalConnectionHandler.suspendConnection(this);
         }
         else if ( state == Watcher.Event.KeeperState.Expired )
         {
@@ -684,26 +687,23 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
         else if ( state == Watcher.Event.KeeperState.SyncConnected )
         {
-            checkNewConnection();
+            internalConnectionHandler.checkNewConnection(this);
             connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
         }
         else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
         {
-            checkNewConnection();
+            internalConnectionHandler.checkNewConnection(this);
             connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
         }
     }
 
-    private void checkNewConnection()
+    void checkInstanceIndex()
     {
-        if ( !client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
+        long instanceIndex = client.getInstanceIndex();
+        long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
+        if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) )   // currentInstanceIndex is initially -1 - ignore this
         {
-            long instanceIndex = client.getInstanceIndex();
-            long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
-            if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) )   // currentInstanceIndex is initially -1 - ignore this
-            {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
-            }
+            connectionStateManager.addStateChange(ConnectionState.LOST);
         }
     }
 
@@ -742,44 +742,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return null;
     }
 
-    private void suspendConnection()
+    boolean setToSuspended()
     {
-        if ( !connectionStateManager.setToSuspended() )
-        {
-            return;
-        }
-
-        if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
-        {
-            doSyncForSuspendedConnection(client.getInstanceIndex());
-        }
+        return connectionStateManager.setToSuspended();
     }
 
-    private void doSyncForSuspendedConnection(final long instanceIndex)
+    void addStateChange(ConnectionState newConnectionState)
     {
-        // we appear to have disconnected, force a new ZK event and see if we can connect to another server
-        final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
-        OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
-        {
-            @Override
-            public void retriesExhausted(OperationAndData<String> operationAndData)
-            {
-                // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
-                // so the pending background sync is no longer valid.
-                // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost
-                if ( (instanceIndex < 0) || (instanceIndex == client.getInstanceIndex()) )
-                {
-                    connectionStateManager.addStateChange(ConnectionState.LOST);
-                }
-                else
-                {
-                    log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
-                    // send -1 to signal that if it happens again, punt and mark the connection lost
-                    doSyncForSuspendedConnection(-1);
-                }
-            }
-        };
-        performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));
+        connectionStateManager.addStateChange(newConnectionState);
     }
 
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
@@ -894,7 +864,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
     }
 
-    private void performBackgroundOperation(OperationAndData<?> operationAndData)
+    void performBackgroundOperation(OperationAndData<?> operationAndData)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
new file mode 100644
index 0000000..e9798d7
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
@@ -0,0 +1,10 @@
+package org.apache.curator.framework.imps;
+
+interface InternalConnectionHandler
+{
+    void checkNewConnection(CuratorFrameworkImpl client);
+
+    void suspendConnection(CuratorFrameworkImpl client);
+
+    boolean checkSessionExpirationEnabled();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
new file mode 100644
index 0000000..b0452c6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
@@ -0,0 +1,22 @@
+package org.apache.curator.framework.imps;
+
+class StandardInternalConnectionHandler implements InternalConnectionHandler
+{
+    @Override
+    public void suspendConnection(CuratorFrameworkImpl client)
+    {
+        client.setToSuspended();
+    }
+
+    @Override
+    public boolean checkSessionExpirationEnabled()
+    {
+        return true;
+    }
+
+    @Override
+    public void checkNewConnection(CuratorFrameworkImpl client)
+    {
+        client.checkInstanceIndex();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 2e7492f..406099d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -21,7 +21,6 @@ package org.apache.curator.framework.state;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import org.apache.curator.connection.ConnectionHandlingPolicyStyle;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.utils.ThreadUtils;
@@ -67,6 +66,7 @@ public class ConnectionStateManager implements Closeable
     private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
     private final int sessionTimeoutMs;
+    private final boolean checkSessionExpiration;
     private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
     private final ExecutorService service;
@@ -88,11 +88,13 @@ public class ConnectionStateManager implements Closeable
      * @param client        the client
      * @param threadFactory thread factory to use or null for a default
      * @param sessionTimeoutMs the ZK session timeout in milliseconds
+     * @param checkSessionExpiration if true, check for session timeouts, etc. ala new connection handling method
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, boolean checkSessionExpiration)
     {
         this.client = client;
         this.sessionTimeoutMs = sessionTimeoutMs;
+        this.checkSessionExpiration = checkSessionExpiration;
         if ( threadFactory == null )
         {
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
@@ -270,7 +272,7 @@ public class ConnectionStateManager implements Closeable
                             }
                         );
                 }
-                else if ( !client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling() )
+                else if ( checkSessionExpiration )
                 {
                     synchronized(this)
                     {