You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:19 UTC

[15/18] git commit: AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch

AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5954e66f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5954e66f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5954e66f

Branch: refs/heads/master
Commit: 5954e66fa3108c39b3f2915583def5e51915846f
Parents: fab79a4
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 14:38:14 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 14:38:14 2014 -0500

----------------------------------------------------------------------
 .../recipes/AfterConnectionEstablished.java     |  7 ++-
 .../framework/recipes/leader/LeaderLatch.java   | 51 ++++++++++++++------
 .../recipes/leader/TestLeaderLatch.java         |  2 +
 3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index 41ba702..65c6ace 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -23,6 +24,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /**
  * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
@@ -37,8 +39,9 @@ public class AfterConnectionEstablished
      *
      * @param client             The curator client
      * @param runAfterConnection The logic to run
+     * @return future of the task so it can be canceled, etc. if needed
      */
-    public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
+    public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
     {
         //Block until connected
         final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
@@ -62,7 +65,7 @@ public class AfterConnectionEstablished
                 }
             }
         };
-        executor.submit(internalCall);
+        return executor.submit(internalCall);
     }
 
     private AfterConnectionEstablished()

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 9d70645..6f7636a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -46,6 +46,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +70,7 @@ public class LeaderLatch implements Closeable
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
+    private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
 
     private final ConnectionStateListener listener = new ConnectionStateListener()
     {
@@ -155,22 +157,21 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        AfterConnectionEstablished.execute(client, new Runnable()
-            {
-                @Override
-                public void run()
+        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
                 {
-                    client.getConnectionStateListenable().addListener(listener);
-                    try
-                    {
-                        reset();
-                    }
-                    catch ( Exception e )
+                    @Override
+                    public void run()
                     {
-                        log.error("An error occurred checking resetting leadership.", e);
+                        try
+                        {
+                            internalStart();
+                        }
+                        finally
+                        {
+                            startTask.set(null);
+                        }
                     }
-                }
-            });
+                }));
     }
 
     /**
@@ -194,11 +195,17 @@ public class LeaderLatch implements Closeable
      * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
      * @throws IOException errors
      */
-    public void close(CloseMode closeMode) throws IOException
+    public synchronized void close(CloseMode closeMode) throws IOException
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
+        Future<?> localStartTask = startTask.getAndSet(null);
+        if ( localStartTask != null )
+        {
+            localStartTask.cancel(true);
+        }
+
         try
         {
             setNode(null);
@@ -485,6 +492,22 @@ public class LeaderLatch implements Closeable
         client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
     }
 
+    private synchronized void internalStart()
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.getConnectionStateListenable().addListener(listener);
+            try
+            {
+                reset();
+            }
+            catch ( Exception e )
+            {
+                log.error("An error occurred checking resetting leadership.", e);
+            }
+        }
+    }
+
     private void checkLeadership(List<String> children) throws Exception
     {
         final String localOurPath = ourPath.get();

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b97e708..f4fb1c7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -89,6 +89,8 @@ public class TestLeaderLatch extends BaseClassForTests
         try
         {
             client.start();
+            client.create().creatingParentsIfNeeded().forPath(PATH_NAME);
+
             LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
 
             latch.debugResetWaitLatch = new CountDownLatch(1);