You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/17 21:38:19 UTC

[1/2] git commit: Now that start() uses AfterConnectionEstablished, no longer correct/needed to handle CONNECTED in state change

Repository: curator
Updated Branches:
  refs/heads/CURATOR-110 04cefb47f -> 5954e66fa


Now that start() uses AfterConnectionEstablished, no longer correct/needed to handle CONNECTED in state change


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fab79a45
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fab79a45
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fab79a45

Branch: refs/heads/CURATOR-110
Commit: fab79a4577d3af80260d25128cc56f26c7011dbe
Parents: 04cefb4
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:41:13 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:41:13 2014 -0500

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java   | 100 ++++++++++---------
 1 file changed, 51 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fab79a45/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index dce3f5e..9d70645 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -44,7 +44,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -156,9 +155,7 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        AfterConnectionEstablished.execute
-        (
-            client, new Runnable()
+        AfterConnectionEstablished.execute(client, new Runnable()
             {
                 @Override
                 public void run()
@@ -173,8 +170,7 @@ public class LeaderLatch implements Closeable
                         log.error("An error occurred checking resetting leadership.", e);
                     }
                 }
-            }
-        );
+            });
     }
 
     /**
@@ -196,7 +192,6 @@ public class LeaderLatch implements Closeable
      * instances must eventually be closed.
      *
      * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
-     *
      * @throws IOException errors
      */
     public void close(CloseMode closeMode) throws IOException
@@ -218,28 +213,28 @@ public class LeaderLatch implements Closeable
 
             switch ( closeMode )
             {
-                case NOTIFY_LEADER:
-                {
-                    setLeadership(false);
-                    listeners.clear();
-                    break;
-                }
+            case NOTIFY_LEADER:
+            {
+                setLeadership(false);
+                listeners.clear();
+                break;
+            }
 
-                default:
-                {
-                    listeners.clear();
-                    setLeadership(false);
-                    break;
-                }
+            default:
+            {
+                listeners.clear();
+                setLeadership(false);
+                break;
+            }
             }
         }
     }
 
     /**
      * Attaches a listener to this LeaderLatch
-     * <p>
+     * <p/>
      * Attaching the same listener multiple times is a noop from the second time on.
-     * <p>
+     * <p/>
      * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
      * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
@@ -253,15 +248,15 @@ public class LeaderLatch implements Closeable
 
     /**
      * Attaches a listener to this LeaderLatch
-     * <p>
+     * <p/>
      * Attaching the same listener multiple times is a noop from the second time on.
-     * <p>
+     * <p/>
      * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
      * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
      *
      * @param listener the listener to attach
-     * @param executor     An executor to run the methods for the listener on.
+     * @param executor An executor to run the methods for the listener on.
      */
     public void addListener(LeaderLatchListener listener, Executor executor)
     {
@@ -282,7 +277,7 @@ public class LeaderLatch implements Closeable
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
      * <p>If this instance already is the leader then this method returns immediately.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of three things happen:</p>
@@ -324,10 +319,10 @@ public class LeaderLatch implements Closeable
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted},
      * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
-     *
+     * <p/>
      * <p>If this instance already is the leader then this method returns immediately
      * with the value {@code true}.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of four things happen:</p>
@@ -338,7 +333,7 @@ public class LeaderLatch implements Closeable
      * <li>The specified waiting time elapses.</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     *
+     * <p/>
      * <p>If the current thread:</p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
@@ -346,7 +341,7 @@ public class LeaderLatch implements Closeable
      * </ul>
      * <p>then {@link InterruptedException} is thrown and the current thread's
      * interrupted status is cleared.</p>
-     *
+     * <p/>
      * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
      * then the value {@code false} is returned.  If the time is less than or equal to zero, the method
      * will not wait at all.</p>
@@ -354,7 +349,7 @@ public class LeaderLatch implements Closeable
      * @param timeout the maximum time to wait
      * @param unit    the time unit of the {@code timeout} argument
      * @return {@code true} if the count reached zero and {@code false}
-     *         if the waiting time elapsed before the count reached zero or the instances was closed
+     * if the waiting time elapsed before the count reached zero or the instances was closed
      * @throws InterruptedException if the current thread is interrupted
      *                              while waiting
      */
@@ -561,22 +556,35 @@ public class LeaderLatch implements Closeable
 
     private void handleStateChange(ConnectionState newState)
     {
-        if ( newState == ConnectionState.RECONNECTED )
+        switch ( newState )
         {
-            try
+            default:
             {
-                reset();
+                // NOP
+                break;
+            }
+
+            case RECONNECTED:
+            {
+                try
+                {
+                    reset();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not reset leader latch", e);
+                    setLeadership(false);
+                }
+                break;
             }
-            catch (Exception e)
+
+            case SUSPENDED:
+            case LOST:
             {
-                log.error("Could not reset leader latch", e);
                 setLeadership(false);
+                break;
             }
         }
-        else
-        {
-            setLeadership(false);
-        }
     }
 
     private synchronized void setLeadership(boolean newValue)
@@ -585,9 +593,7 @@ public class LeaderLatch implements Closeable
 
         if ( oldValue && !newValue )
         { // Lost leadership, was true, now false
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
+            listeners.forEach(new Function<LeaderLatchListener, Void>()
                 {
                     @Override
                     public Void apply(LeaderLatchListener listener)
@@ -595,14 +601,11 @@ public class LeaderLatch implements Closeable
                         listener.notLeader();
                         return null;
                     }
-                }
-            );
+                });
         }
         else if ( !oldValue && newValue )
         { // Gained leadership, was false, now true
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
+            listeners.forEach(new Function<LeaderLatchListener, Void>()
                 {
                     @Override
                     public Void apply(LeaderLatchListener input)
@@ -610,8 +613,7 @@ public class LeaderLatch implements Closeable
                         input.isLeader();
                         return null;
                     }
-                }
-            );
+                });
         }
 
         notifyAll();


[2/2] git commit: AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch

Posted by ra...@apache.org.
AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5954e66f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5954e66f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5954e66f

Branch: refs/heads/CURATOR-110
Commit: 5954e66fa3108c39b3f2915583def5e51915846f
Parents: fab79a4
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 14:38:14 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 14:38:14 2014 -0500

----------------------------------------------------------------------
 .../recipes/AfterConnectionEstablished.java     |  7 ++-
 .../framework/recipes/leader/LeaderLatch.java   | 51 ++++++++++++++------
 .../recipes/leader/TestLeaderLatch.java         |  2 +
 3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index 41ba702..65c6ace 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -23,6 +24,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /**
  * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
@@ -37,8 +39,9 @@ public class AfterConnectionEstablished
      *
      * @param client             The curator client
      * @param runAfterConnection The logic to run
+     * @return future of the task so it can be canceled, etc. if needed
      */
-    public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
+    public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
     {
         //Block until connected
         final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
@@ -62,7 +65,7 @@ public class AfterConnectionEstablished
                 }
             }
         };
-        executor.submit(internalCall);
+        return executor.submit(internalCall);
     }
 
     private AfterConnectionEstablished()

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 9d70645..6f7636a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -46,6 +46,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +70,7 @@ public class LeaderLatch implements Closeable
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
+    private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
 
     private final ConnectionStateListener listener = new ConnectionStateListener()
     {
@@ -155,22 +157,21 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        AfterConnectionEstablished.execute(client, new Runnable()
-            {
-                @Override
-                public void run()
+        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
                 {
-                    client.getConnectionStateListenable().addListener(listener);
-                    try
-                    {
-                        reset();
-                    }
-                    catch ( Exception e )
+                    @Override
+                    public void run()
                     {
-                        log.error("An error occurred checking resetting leadership.", e);
+                        try
+                        {
+                            internalStart();
+                        }
+                        finally
+                        {
+                            startTask.set(null);
+                        }
                     }
-                }
-            });
+                }));
     }
 
     /**
@@ -194,11 +195,17 @@ public class LeaderLatch implements Closeable
      * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
      * @throws IOException errors
      */
-    public void close(CloseMode closeMode) throws IOException
+    public synchronized void close(CloseMode closeMode) throws IOException
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
+        Future<?> localStartTask = startTask.getAndSet(null);
+        if ( localStartTask != null )
+        {
+            localStartTask.cancel(true);
+        }
+
         try
         {
             setNode(null);
@@ -485,6 +492,22 @@ public class LeaderLatch implements Closeable
         client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
     }
 
+    private synchronized void internalStart()
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.getConnectionStateListenable().addListener(listener);
+            try
+            {
+                reset();
+            }
+            catch ( Exception e )
+            {
+                log.error("An error occurred checking resetting leadership.", e);
+            }
+        }
+    }
+
     private void checkLeadership(List<String> children) throws Exception
     {
         final String localOurPath = ourPath.get();

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b97e708..f4fb1c7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -89,6 +89,8 @@ public class TestLeaderLatch extends BaseClassForTests
         try
         {
             client.start();
+            client.create().creatingParentsIfNeeded().forPath(PATH_NAME);
+
             LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
 
             latch.debugResetWaitLatch = new CountDownLatch(1);