You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:13 UTC

[09/18] git commit: CURATOR-110 - Moved the 'wait until connection established' logic into the ExecuteAfterConnectionEstablished utility class. Cleaned up the blockUntilConnected() logic in the CuratorFrameworkImpl

CURATOR-110 - Moved the 'wait until connection established' logic into
the ExecuteAfterConnectionEstablished utility class. Cleaned up the
blockUntilConnected() logic in the CuratorFrameworkImpl

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8138ed9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8138ed9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8138ed9

Branch: refs/heads/master
Commit: e8138ed9768e99e98e308845626e36aa55afadb0
Parents: 63d0401
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Mon Jun 16 15:59:56 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Mon Jun 16 15:59:56 2014 +1000

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  9 +---
 .../framework/imps/CuratorFrameworkImpl.java    | 22 +++-----
 .../framework/state/ConnectionStateManager.java |  5 ++
 .../ExecuteAfterConnectionEstablished.java      | 53 ++++++++++++++++++++
 .../framework/recipes/leader/LeaderLatch.java   | 44 ++++++----------
 5 files changed, 81 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 1df3fa5..13cff30 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -213,14 +213,7 @@ public interface CuratorFramework extends Closeable
      * @param watcher the watcher
      */
     public void clearWatcherReferences(Watcher watcher);
-    
-    /**
-     * Get the current connection state. The connection state will have a value of 0 until
-     * the first connection related event is received.
-     * @return The current connection state, or null if it is unknown 
-     */
-    public ConnectionState getCurrentConnectionState();
-    
+        
     /**
      * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
      * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 33e260d..d1de29f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -67,7 +67,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
     private final NamespaceImpl                                         namespace;
     private final ConnectionStateManager                                connectionStateManager;
-    private final AtomicReference<ConnectionState>						connectionState;
     private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
     private final byte[]                                                defaultData;
     private final FailedDeleteManager                                   failedDeleteManager;
@@ -75,6 +74,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ACLProvider                                           aclProvider;
     private final NamespaceFacadeCache                                  namespaceFacadeCache;
     private final NamespaceWatcherMap                                   namespaceWatcherMap = new NamespaceWatcherMap(this);
+    private final Object												connectionLock = new Object();
 
     private volatile ExecutorService                                    executorService;
     private final AtomicBoolean                                         logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -151,7 +151,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
-        connectionState = new AtomicReference<ConnectionState>(null);
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -174,10 +173,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
 			@Override
 			public void stateChanged(CuratorFramework client, ConnectionState newState)
 			{
-				connectionState.set(newState);
-				synchronized(connectionState)
+				synchronized(connectionLock)
 				{
-					connectionState.notifyAll();
+					connectionLock.notifyAll();
 				}
 			}
 		});
@@ -220,7 +218,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         threadFactory = parent.threadFactory;
         backgroundOperations = parent.backgroundOperations;
         connectionStateManager = parent.connectionStateManager;
-        connectionState = parent.connectionState;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
         compressionProvider = parent.compressionProvider;
@@ -890,16 +887,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
         );
     }
     
-    public ConnectionState getCurrentConnectionState()
-    {
-    	return connectionState.get();
-    }
-    
     @Override
     public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
     {
     	//Check if we're already connected
-    	ConnectionState currentConnectionState = connectionState.get();
+    	ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
     	if(currentConnectionState != null && currentConnectionState.isConnected())
     	{
     		return true;
@@ -910,9 +902,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
     	
     	for(;;)
     	{
-    		synchronized(connectionState)
+    		synchronized(connectionLock)
     		{
-    			currentConnectionState = connectionState.get();
+    			currentConnectionState = connectionStateManager.getCurrentConnectionState();
     	    	if(currentConnectionState != null && currentConnectionState.isConnected())
     			{
     				return true;
@@ -930,7 +922,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         			}    					
     			}
     		
-    			connectionState.wait(waitTime);
+    			connectionLock.wait(waitTime);
     		}
     	}
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 42804b8..ba29994 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -188,6 +188,11 @@ public class ConnectionStateManager implements Closeable
 
         return true;
     }
+    
+    public synchronized ConnectionState getCurrentConnectionState()
+    {
+    	return currentConnectionState;
+    }
 
     private void postState(ConnectionState state)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
new file mode 100644
index 0000000..d213d37
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
@@ -0,0 +1,53 @@
+package org.apache.curator.framework.recipes;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
+ *
+ */
+public class ExecuteAfterConnectionEstablished
+{
+	private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
+	
+	/**
+	 * Spawns a new new background thread that will block until a connection is available and
+	 * then execute the 'runAfterConnection' logic
+	 * @param name The name of the spawned thread
+	 * @param client The curator client
+	 * @param runAfterConnection The logic to run
+	 */
+	public static void executeAfterConnectionEstablishedInBackground(String name,
+																     final CuratorFramework client,
+																     final Runnable runAfterConnection)
+	{
+        //Block until connected
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(name);
+        executor.submit(new Runnable()
+        {
+			
+			@Override
+			public void run()
+			{
+				try
+				{
+		        	client.blockUntilConnected();			        
+			        runAfterConnection.run();
+				}
+				catch(Exception e)
+				{
+					log.error("An error occurred blocking until a connection is available", e);
+				}
+				finally
+				{
+					executor.shutdown();
+				}
+			}
+        });
+	}
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 21e8cca..13a9f21 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -148,14 +149,6 @@ public class LeaderLatch implements Closeable
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
     }
-    
-    private CountDownLatch startLatch;
-    
-    public LeaderLatch(CuratorFramework client, String latchPath,
-    		CountDownLatch startLatch) {
-    	this(client, latchPath);
-        this.startLatch = startLatch;
-    }
 
     /**
      * Add this instance to the leadership election and attempt to acquire leadership.
@@ -166,30 +159,23 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        //Block until connected
-        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
-        executor.submit(new Runnable()
-        {
-			
-			@Override
+        ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(),
+        		client, new Runnable()
+        {					
+        	@Override
 			public void run()
 			{
-		        try
-		        {
-		        	client.blockUntilConnected();
-			        
-			        client.getConnectionStateListenable().addListener(listener);		        	
-		        	reset();
-		        }
-		        catch(Exception ex)
-		        {
-		        	log.error("An error occurred checking resetting leadership.", ex);
-		        } finally {
-		        	//Shutdown the executor
-		        	executor.shutdown();
-		        }
+        		try
+        		{
+        			client.getConnectionStateListenable().addListener(listener);		        	
+        			reset();
+        		}
+        		catch(Exception ex)
+                {
+                	log.error("An error occurred checking resetting leadership.", ex);
+                }
 			}
-        });
+		});     
     }
 
     /**