You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/17 21:38:19 UTC
[1/2] git commit: Now that start() uses AfterConnectionEstablished,
no longer correct/needed to handle CONNECTED in state change
Repository: curator
Updated Branches:
refs/heads/CURATOR-110 04cefb47f -> 5954e66fa
Now that start() uses AfterConnectionEstablished, no longer correct/needed to handle CONNECTED in state change
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fab79a45
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fab79a45
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fab79a45
Branch: refs/heads/CURATOR-110
Commit: fab79a4577d3af80260d25128cc56f26c7011dbe
Parents: 04cefb4
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:41:13 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:41:13 2014 -0500
----------------------------------------------------------------------
.../framework/recipes/leader/LeaderLatch.java | 100 ++++++++++---------
1 file changed, 51 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fab79a45/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index dce3f5e..9d70645 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -44,7 +44,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -156,9 +155,7 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- AfterConnectionEstablished.execute
- (
- client, new Runnable()
+ AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
@@ -173,8 +170,7 @@ public class LeaderLatch implements Closeable
log.error("An error occurred checking resetting leadership.", e);
}
}
- }
- );
+ });
}
/**
@@ -196,7 +192,6 @@ public class LeaderLatch implements Closeable
* instances must eventually be closed.
*
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
- *
* @throws IOException errors
*/
public void close(CloseMode closeMode) throws IOException
@@ -218,28 +213,28 @@ public class LeaderLatch implements Closeable
switch ( closeMode )
{
- case NOTIFY_LEADER:
- {
- setLeadership(false);
- listeners.clear();
- break;
- }
+ case NOTIFY_LEADER:
+ {
+ setLeadership(false);
+ listeners.clear();
+ break;
+ }
- default:
- {
- listeners.clear();
- setLeadership(false);
- break;
- }
+ default:
+ {
+ listeners.clear();
+ setLeadership(false);
+ break;
+ }
}
}
}
/**
* Attaches a listener to this LeaderLatch
- * <p>
+ * <p/>
* Attaching the same listener multiple times is a noop from the second time on.
- * <p>
+ * <p/>
* All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
* executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
@@ -253,15 +248,15 @@ public class LeaderLatch implements Closeable
/**
* Attaches a listener to this LeaderLatch
- * <p>
+ * <p/>
* Attaching the same listener multiple times is a noop from the second time on.
- * <p>
+ * <p/>
* All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
* executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
*
* @param listener the listener to attach
- * @param executor An executor to run the methods for the listener on.
+ * @param executor An executor to run the methods for the listener on.
*/
public void addListener(LeaderLatchListener listener, Executor executor)
{
@@ -282,7 +277,7 @@ public class LeaderLatch implements Closeable
* <p>Causes the current thread to wait until this instance acquires leadership
* unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
* <p>If this instance already is the leader then this method returns immediately.</p>
- *
+ * <p/>
* <p>Otherwise the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of three things happen:</p>
@@ -324,10 +319,10 @@ public class LeaderLatch implements Closeable
* <p>Causes the current thread to wait until this instance acquires leadership
* unless the thread is {@linkplain Thread#interrupt interrupted},
* the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
- *
+ * <p/>
* <p>If this instance already is the leader then this method returns immediately
* with the value {@code true}.</p>
- *
+ * <p/>
* <p>Otherwise the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of four things happen:</p>
@@ -338,7 +333,7 @@ public class LeaderLatch implements Closeable
* <li>The specified waiting time elapses.</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- *
+ * <p/>
* <p>If the current thread:</p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
@@ -346,7 +341,7 @@ public class LeaderLatch implements Closeable
* </ul>
* <p>then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.</p>
- *
+ * <p/>
* <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
* then the value {@code false} is returned. If the time is less than or equal to zero, the method
* will not wait at all.</p>
@@ -354,7 +349,7 @@ public class LeaderLatch implements Closeable
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false}
- * if the waiting time elapsed before the count reached zero or the instances was closed
+ * if the waiting time elapsed before the count reached zero or the instances was closed
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
@@ -561,22 +556,35 @@ public class LeaderLatch implements Closeable
private void handleStateChange(ConnectionState newState)
{
- if ( newState == ConnectionState.RECONNECTED )
+ switch ( newState )
{
- try
+ default:
{
- reset();
+ // NOP
+ break;
+ }
+
+ case RECONNECTED:
+ {
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not reset leader latch", e);
+ setLeadership(false);
+ }
+ break;
}
- catch (Exception e)
+
+ case SUSPENDED:
+ case LOST:
{
- log.error("Could not reset leader latch", e);
setLeadership(false);
+ break;
}
}
- else
- {
- setLeadership(false);
- }
}
private synchronized void setLeadership(boolean newValue)
@@ -585,9 +593,7 @@ public class LeaderLatch implements Closeable
if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
- listeners.forEach
- (
- new Function<LeaderLatchListener, Void>()
+ listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
@@ -595,14 +601,11 @@ public class LeaderLatch implements Closeable
listener.notLeader();
return null;
}
- }
- );
+ });
}
else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
- listeners.forEach
- (
- new Function<LeaderLatchListener, Void>()
+ listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
@@ -610,8 +613,7 @@ public class LeaderLatch implements Closeable
input.isLeader();
return null;
}
- }
- );
+ });
}
notifyAll();
[2/2] git commit: AfterConnectionEstablished needs to return the
future from the service so that clients can cancel the action if needed.
Added this to LeaderLatch
Posted by ra...@apache.org.
AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5954e66f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5954e66f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5954e66f
Branch: refs/heads/CURATOR-110
Commit: 5954e66fa3108c39b3f2915583def5e51915846f
Parents: fab79a4
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 14:38:14 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 14:38:14 2014 -0500
----------------------------------------------------------------------
.../recipes/AfterConnectionEstablished.java | 7 ++-
.../framework/recipes/leader/LeaderLatch.java | 51 ++++++++++++++------
.../recipes/leader/TestLeaderLatch.java | 2 +
3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index 41ba702..65c6ace 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes;
import org.apache.curator.framework.CuratorFramework;
@@ -23,6 +24,7 @@ import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
/**
* Utility class to allow execution of logic once a ZooKeeper connection becomes available.
@@ -37,8 +39,9 @@ public class AfterConnectionEstablished
*
* @param client The curator client
* @param runAfterConnection The logic to run
+ * @return future of the task so it can be canceled, etc. if needed
*/
- public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
+ public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
{
//Block until connected
final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
@@ -62,7 +65,7 @@ public class AfterConnectionEstablished
}
}
};
- executor.submit(internalCall);
+ return executor.submit(internalCall);
}
private AfterConnectionEstablished()
http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 9d70645..6f7636a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -46,6 +46,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +70,7 @@ public class LeaderLatch implements Closeable
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;
+ private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@@ -155,22 +157,21 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- AfterConnectionEstablished.execute(client, new Runnable()
- {
- @Override
- public void run()
+ startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
- client.getConnectionStateListenable().addListener(listener);
- try
- {
- reset();
- }
- catch ( Exception e )
+ @Override
+ public void run()
{
- log.error("An error occurred checking resetting leadership.", e);
+ try
+ {
+ internalStart();
+ }
+ finally
+ {
+ startTask.set(null);
+ }
}
- }
- });
+ }));
}
/**
@@ -194,11 +195,17 @@ public class LeaderLatch implements Closeable
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
* @throws IOException errors
*/
- public void close(CloseMode closeMode) throws IOException
+ public synchronized void close(CloseMode closeMode) throws IOException
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
+ Future<?> localStartTask = startTask.getAndSet(null);
+ if ( localStartTask != null )
+ {
+ localStartTask.cancel(true);
+ }
+
try
{
setNode(null);
@@ -485,6 +492,22 @@ public class LeaderLatch implements Closeable
client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
+ private synchronized void internalStart()
+ {
+ if ( state.get() == State.STARTED )
+ {
+ client.getConnectionStateListenable().addListener(listener);
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("An error occurred checking resetting leadership.", e);
+ }
+ }
+ }
+
private void checkLeadership(List<String> children) throws Exception
{
final String localOurPath = ourPath.get();
http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b97e708..f4fb1c7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -89,6 +89,8 @@ public class TestLeaderLatch extends BaseClassForTests
try
{
client.start();
+ client.create().creatingParentsIfNeeded().forPath(PATH_NAME);
+
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.debugResetWaitLatch = new CountDownLatch(1);