You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/06 03:54:20 UTC
git commit: Added CloseMode support to LeaderLatch in order to be
able to trigger the notLeader() callback when a Latch is manually closed.
Repository: curator
Updated Branches:
refs/heads/CURATOR-92 [created] 2580ef3f5
Added CloseMode support to LeaderLatch in order to be able to trigger the notLeader() callback when a Latch is manually closed.
This closes #1
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2580ef3f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2580ef3f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2580ef3f
Branch: refs/heads/CURATOR-92
Commit: 2580ef3f5df148a67fe5d2769e4d04890e8b4fd6
Parents: 0d9eaed
Author: David Trott <gi...@davidtrott.com>
Authored: Wed Mar 5 16:57:04 2014 -0800
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 5 21:53:16 2014 -0500
----------------------------------------------------------------------
.../framework/recipes/leader/LeaderLatch.java | 50 +++++-
.../recipes/leader/TestLeaderLatch.java | 171 +++++++++++++++++++
2 files changed, 218 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 8d9a11f..310919b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -67,6 +67,7 @@ public class LeaderLatch implements Closeable
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+ private final CloseMode closeMode;
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@@ -95,13 +96,19 @@ public class LeaderLatch implements Closeable
CLOSED
}
+ public enum CloseMode
+ {
+ SILENT,
+ NOTIFY_LEADER
+ }
+
/**
* @param client the client
* @param latchPath the path for this leadership group
*/
public LeaderLatch(CuratorFramework client, String latchPath)
{
- this(client, latchPath, "");
+ this(client, latchPath, "", CloseMode.SILENT);
}
/**
@@ -111,9 +118,21 @@ public class LeaderLatch implements Closeable
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id)
{
+ this(client, latchPath, id, CloseMode.SILENT);
+ }
+
+ /**
+ * @param client the client
+ * @param latchPath the path for this leadership group
+ * @param id participant ID
+ * @param closeMode behaviour of listener on explicit close.
+ */
+ public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
+ {
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
this.id = Preconditions.checkNotNull(id, "id cannot be null");
+ this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
/**
@@ -139,7 +158,22 @@ public class LeaderLatch implements Closeable
@Override
public void close() throws IOException
{
+ close(this.closeMode);
+ }
+
+ /**
+ * Remove this instance from the leadership election. If this instance is the leader, leadership
+ * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+ * instances must eventually be closed.
+ *
+ * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
+ *
+ * @throws IOException errors
+ */
+ public void close(CloseMode closeMode) throws IOException
+ {
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+ Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
try
{
@@ -152,8 +186,18 @@ public class LeaderLatch implements Closeable
finally
{
client.getConnectionStateListenable().removeListener(listener);
- listeners.clear();
- setLeadership(false);
+
+ switch(closeMode)
+ {
+ case NOTIFY_LEADER:
+ setLeadership(false);
+ listeners.clear();
+ break;
+ default:
+ listeners.clear();
+ setLeadership(false);
+ break;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4b5590..067c817 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -365,6 +365,177 @@ public class TestLeaderLatch extends BaseClassForTests
}
}
+ @Test
+ public void testCallbackNotifyLeader() throws Exception
+ {
+ final int PARTICIPANT_QTY = 10;
+ final int SILENT_QTY = 3;
+
+ final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
+ final AtomicLong masterCounter = new AtomicLong(0);
+ final AtomicLong dunceCounter = new AtomicLong(0);
+
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
+
+ List<LeaderLatch> latches = Lists.newArrayList();
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ LeaderLatch.CloseMode closeMode = i < SILENT_QTY ?
+ LeaderLatch.CloseMode.SILENT :
+ LeaderLatch.CloseMode.NOTIFY_LEADER;
+
+ final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
+ latch.addListener(
+ new LeaderLatchListener()
+ {
+ boolean beenLeader = false;
+
+ @Override
+ public void isLeader()
+ {
+ if ( !beenLeader )
+ {
+ masterCounter.incrementAndGet();
+ beenLeader = true;
+ try
+ {
+ latch.reset();
+ }
+ catch ( Exception e )
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+ else
+ {
+ masterCounter.incrementAndGet();
+ CloseableUtils.closeQuietly(latch);
+ timesSquare.countDown();
+ }
+ }
+
+ @Override
+ public void notLeader()
+ {
+ dunceCounter.incrementAndGet();
+ }
+ },
+ exec
+ );
+ latches.add(latch);
+ }
+
+ try
+ {
+ client.start();
+
+ for ( LeaderLatch latch : latches )
+ {
+ latch.start();
+ }
+
+ timesSquare.await();
+
+ Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
+ Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
+ for ( LeaderLatch latch : latches )
+ {
+ Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
+ }
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.getState() != LeaderLatch.State.CLOSED )
+ {
+ CloseableUtils.closeQuietly(latch);
+ }
+ }
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testCallbackDontNotifyDunce() throws Exception {
+ final AtomicLong masterCounter = new AtomicLong(0);
+ final AtomicLong dunceCounter = new AtomicLong(0);
+
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+
+ final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
+ final LeaderLatch dunce = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+ leader.addListener(new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ }
+
+ @Override
+ public void notLeader()
+ {
+ masterCounter.incrementAndGet();
+ }
+ });
+
+ dunce.addListener(new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ }
+
+ @Override
+ public void notLeader()
+ {
+ dunceCounter.incrementAndGet();
+ }
+ });
+
+ try
+ {
+ client.start();
+
+ leader.start();
+
+ timing.sleepABit();
+
+ dunce.start();
+
+ timing.sleepABit();
+
+ dunce.close();
+
+ timing.sleepABit();
+
+ // Test the close override
+ leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+ Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
+ Assert.assertEquals(dunce.getState(), LeaderLatch.State.CLOSED);
+
+ Assert.assertEquals(masterCounter.get(), 1);
+ Assert.assertEquals(dunceCounter.get(), 0);
+ }
+ finally
+ {
+ if (leader.getState() != LeaderLatch.State.CLOSED)
+ {
+ CloseableUtils.closeQuietly(leader);
+ }
+ if (dunce.getState() != LeaderLatch.State.CLOSED)
+ {
+ CloseableUtils.closeQuietly(dunce);
+ }
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
private enum Mode
{
START_IMMEDIATELY,