You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:13 UTC
[09/18] git commit: CURATOR-110 - Moved the 'wait until connection
established' logic into the ExecuteAfterConnectionEstablished utility class.
Cleaned up the blockUntilConnected() logic in the CuratorFrameworkImpl
CURATOR-110 - Moved the 'wait until connection established' logic into
the ExecuteAfterConnectionEstablished utility class. Cleaned up the
blockUntilConnected() logic in the CuratorFrameworkImpl
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8138ed9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8138ed9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8138ed9
Branch: refs/heads/master
Commit: e8138ed9768e99e98e308845626e36aa55afadb0
Parents: 63d0401
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Mon Jun 16 15:59:56 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Mon Jun 16 15:59:56 2014 +1000
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 9 +---
.../framework/imps/CuratorFrameworkImpl.java | 22 +++-----
.../framework/state/ConnectionStateManager.java | 5 ++
.../ExecuteAfterConnectionEstablished.java | 53 ++++++++++++++++++++
.../framework/recipes/leader/LeaderLatch.java | 44 ++++++----------
5 files changed, 81 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 1df3fa5..13cff30 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -213,14 +213,7 @@ public interface CuratorFramework extends Closeable
* @param watcher the watcher
*/
public void clearWatcherReferences(Watcher watcher);
-
- /**
- * Get the current connection state. The connection state will have a value of 0 until
- * the first connection related event is received.
- * @return The current connection state, or null if it is unknown
- */
- public ConnectionState getCurrentConnectionState();
-
+
/**
* Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
* @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 33e260d..d1de29f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -67,7 +67,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
- private final AtomicReference<ConnectionState> connectionState;
private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
@@ -75,6 +74,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
+ private final Object connectionLock = new Object();
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -151,7 +151,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
- connectionState = new AtomicReference<ConnectionState>(null);
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -174,10 +173,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- connectionState.set(newState);
- synchronized(connectionState)
+ synchronized(connectionLock)
{
- connectionState.notifyAll();
+ connectionLock.notifyAll();
}
}
});
@@ -220,7 +218,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
threadFactory = parent.threadFactory;
backgroundOperations = parent.backgroundOperations;
connectionStateManager = parent.connectionStateManager;
- connectionState = parent.connectionState;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
compressionProvider = parent.compressionProvider;
@@ -890,16 +887,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
);
}
- public ConnectionState getCurrentConnectionState()
- {
- return connectionState.get();
- }
-
@Override
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
{
//Check if we're already connected
- ConnectionState currentConnectionState = connectionState.get();
+ ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
if(currentConnectionState != null && currentConnectionState.isConnected())
{
return true;
@@ -910,9 +902,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
for(;;)
{
- synchronized(connectionState)
+ synchronized(connectionLock)
{
- currentConnectionState = connectionState.get();
+ currentConnectionState = connectionStateManager.getCurrentConnectionState();
if(currentConnectionState != null && currentConnectionState.isConnected())
{
return true;
@@ -930,7 +922,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- connectionState.wait(waitTime);
+ connectionLock.wait(waitTime);
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 42804b8..ba29994 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -188,6 +188,11 @@ public class ConnectionStateManager implements Closeable
return true;
}
+
+ public synchronized ConnectionState getCurrentConnectionState()
+ {
+ return currentConnectionState;
+ }
private void postState(ConnectionState state)
{
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
new file mode 100644
index 0000000..d213d37
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
@@ -0,0 +1,53 @@
+package org.apache.curator.framework.recipes;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
+ *
+ */
+public class ExecuteAfterConnectionEstablished
+{
+ private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
+
+ /**
+ * Spawns a new new background thread that will block until a connection is available and
+ * then execute the 'runAfterConnection' logic
+ * @param name The name of the spawned thread
+ * @param client The curator client
+ * @param runAfterConnection The logic to run
+ */
+ public static void executeAfterConnectionEstablishedInBackground(String name,
+ final CuratorFramework client,
+ final Runnable runAfterConnection)
+ {
+ //Block until connected
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(name);
+ executor.submit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ client.blockUntilConnected();
+ runAfterConnection.run();
+ }
+ catch(Exception e)
+ {
+ log.error("An error occurred blocking until a connection is available", e);
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 21e8cca..13a9f21 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
import org.apache.curator.framework.recipes.locks.LockInternals;
import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -148,14 +149,6 @@ public class LeaderLatch implements Closeable
this.id = Preconditions.checkNotNull(id, "id cannot be null");
this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
-
- private CountDownLatch startLatch;
-
- public LeaderLatch(CuratorFramework client, String latchPath,
- CountDownLatch startLatch) {
- this(client, latchPath);
- this.startLatch = startLatch;
- }
/**
* Add this instance to the leadership election and attempt to acquire leadership.
@@ -166,30 +159,23 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- //Block until connected
- final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
- executor.submit(new Runnable()
- {
-
- @Override
+ ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(),
+ client, new Runnable()
+ {
+ @Override
public void run()
{
- try
- {
- client.blockUntilConnected();
-
- client.getConnectionStateListenable().addListener(listener);
- reset();
- }
- catch(Exception ex)
- {
- log.error("An error occurred checking resetting leadership.", ex);
- } finally {
- //Shutdown the executor
- executor.shutdown();
- }
+ try
+ {
+ client.getConnectionStateListenable().addListener(listener);
+ reset();
+ }
+ catch(Exception ex)
+ {
+ log.error("An error occurred checking resetting leadership.", ex);
+ }
}
- });
+ });
}
/**