You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/06 11:59:13 UTC
[1/2] git commit: Added CloseMode support to LeaderLatch in order to
be able to trigger the notLeader() callback when a Latch is manually closed.
Repository: curator
Updated Branches:
refs/heads/master 0d9eaed1c -> a40b81940
Added CloseMode support to LeaderLatch in order to be able to trigger the notLeader() callback when a Latch is manually closed.
This closes #1
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2580ef3f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2580ef3f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2580ef3f
Branch: refs/heads/master
Commit: 2580ef3f5df148a67fe5d2769e4d04890e8b4fd6
Parents: 0d9eaed
Author: David Trott <gi...@davidtrott.com>
Authored: Wed Mar 5 16:57:04 2014 -0800
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 5 21:53:16 2014 -0500
----------------------------------------------------------------------
.../framework/recipes/leader/LeaderLatch.java | 50 +++++-
.../recipes/leader/TestLeaderLatch.java | 171 +++++++++++++++++++
2 files changed, 218 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 8d9a11f..310919b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -67,6 +67,7 @@ public class LeaderLatch implements Closeable
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+ private final CloseMode closeMode;
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@@ -95,13 +96,19 @@ public class LeaderLatch implements Closeable
CLOSED
}
+ public enum CloseMode
+ {
+ SILENT,
+ NOTIFY_LEADER
+ }
+
/**
* @param client the client
* @param latchPath the path for this leadership group
*/
public LeaderLatch(CuratorFramework client, String latchPath)
{
- this(client, latchPath, "");
+ this(client, latchPath, "", CloseMode.SILENT);
}
/**
@@ -111,9 +118,21 @@ public class LeaderLatch implements Closeable
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id)
{
+ this(client, latchPath, id, CloseMode.SILENT);
+ }
+
+ /**
+ * @param client the client
+ * @param latchPath the path for this leadership group
+ * @param id participant ID
+ * @param closeMode behaviour of listener on explicit close.
+ */
+ public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
+ {
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
this.id = Preconditions.checkNotNull(id, "id cannot be null");
+ this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
/**
@@ -139,7 +158,22 @@ public class LeaderLatch implements Closeable
@Override
public void close() throws IOException
{
+ close(this.closeMode);
+ }
+
+ /**
+ * Remove this instance from the leadership election. If this instance is the leader, leadership
+ * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+ * instances must eventually be closed.
+ *
+ * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
+ *
+ * @throws IOException errors
+ */
+ public void close(CloseMode closeMode) throws IOException
+ {
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+ Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
try
{
@@ -152,8 +186,18 @@ public class LeaderLatch implements Closeable
finally
{
client.getConnectionStateListenable().removeListener(listener);
- listeners.clear();
- setLeadership(false);
+
+ switch(closeMode)
+ {
+ case NOTIFY_LEADER:
+ setLeadership(false);
+ listeners.clear();
+ break;
+ default:
+ listeners.clear();
+ setLeadership(false);
+ break;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4b5590..067c817 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -365,6 +365,177 @@ public class TestLeaderLatch extends BaseClassForTests
}
}
+ @Test
+ public void testCallbackNotifyLeader() throws Exception
+ {
+ final int PARTICIPANT_QTY = 10;
+ final int SILENT_QTY = 3;
+
+ final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
+ final AtomicLong masterCounter = new AtomicLong(0);
+ final AtomicLong dunceCounter = new AtomicLong(0);
+
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
+
+ List<LeaderLatch> latches = Lists.newArrayList();
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ LeaderLatch.CloseMode closeMode = i < SILENT_QTY ?
+ LeaderLatch.CloseMode.SILENT :
+ LeaderLatch.CloseMode.NOTIFY_LEADER;
+
+ final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
+ latch.addListener(
+ new LeaderLatchListener()
+ {
+ boolean beenLeader = false;
+
+ @Override
+ public void isLeader()
+ {
+ if ( !beenLeader )
+ {
+ masterCounter.incrementAndGet();
+ beenLeader = true;
+ try
+ {
+ latch.reset();
+ }
+ catch ( Exception e )
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+ else
+ {
+ masterCounter.incrementAndGet();
+ CloseableUtils.closeQuietly(latch);
+ timesSquare.countDown();
+ }
+ }
+
+ @Override
+ public void notLeader()
+ {
+ dunceCounter.incrementAndGet();
+ }
+ },
+ exec
+ );
+ latches.add(latch);
+ }
+
+ try
+ {
+ client.start();
+
+ for ( LeaderLatch latch : latches )
+ {
+ latch.start();
+ }
+
+ timesSquare.await();
+
+ Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
+ Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
+ for ( LeaderLatch latch : latches )
+ {
+ Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
+ }
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.getState() != LeaderLatch.State.CLOSED )
+ {
+ CloseableUtils.closeQuietly(latch);
+ }
+ }
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testCallbackDontNotifyDunce() throws Exception {
+ final AtomicLong masterCounter = new AtomicLong(0);
+ final AtomicLong dunceCounter = new AtomicLong(0);
+
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+
+ final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
+ final LeaderLatch dunce = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+ leader.addListener(new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ }
+
+ @Override
+ public void notLeader()
+ {
+ masterCounter.incrementAndGet();
+ }
+ });
+
+ dunce.addListener(new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ }
+
+ @Override
+ public void notLeader()
+ {
+ dunceCounter.incrementAndGet();
+ }
+ });
+
+ try
+ {
+ client.start();
+
+ leader.start();
+
+ timing.sleepABit();
+
+ dunce.start();
+
+ timing.sleepABit();
+
+ dunce.close();
+
+ timing.sleepABit();
+
+ // Test the close override
+ leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+ Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
+ Assert.assertEquals(dunce.getState(), LeaderLatch.State.CLOSED);
+
+ Assert.assertEquals(masterCounter.get(), 1);
+ Assert.assertEquals(dunceCounter.get(), 0);
+ }
+ finally
+ {
+ if (leader.getState() != LeaderLatch.State.CLOSED)
+ {
+ CloseableUtils.closeQuietly(leader);
+ }
+ if (dunce.getState() != LeaderLatch.State.CLOSED)
+ {
+ CloseableUtils.closeQuietly(dunce);
+ }
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
private enum Mode
{
START_IMMEDIATELY,
[2/2] git commit: minor refactoring
Posted by ra...@apache.org.
minor refactoring
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a40b8194
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a40b8194
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a40b8194
Branch: refs/heads/master
Commit: a40b81940849ff37de0dc8d9a412bde709620d59
Parents: 2580ef3
Author: randgalt <ra...@apache.org>
Authored: Thu Mar 6 05:58:52 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Mar 6 05:58:52 2014 -0500
----------------------------------------------------------------------
.../recipes/leader/TestLeaderLatch.java | 226 +++++++++----------
1 file changed, 105 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/a40b8194/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 067c817..1dcd352 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -21,7 +21,6 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -31,6 +30,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@ -123,20 +123,17 @@ public class TestLeaderLatch extends BaseClassForTests
client.start();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener
- (
- new ConnectionStateListener()
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
+ if ( newState == ConnectionState.LOST )
{
- if ( newState == ConnectionState.LOST )
- {
- countDownLatch.countDown();
- }
+ countDownLatch.countDown();
}
}
- );
+ });
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
@@ -228,30 +225,27 @@ public class TestLeaderLatch extends BaseClassForTests
final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
- service.submit
- (
- new Callable<Void>()
+ service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+ try
{
- @Override
- public Void call() throws Exception
- {
- LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- try
- {
- latch.start();
- Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
- Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
- Thread.sleep((int)(10 * Math.random()));
- }
- finally
- {
- thereIsALeader.set(false);
- latch.close();
- }
- return null;
- }
+ latch.start();
+ Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
+ Thread.sleep((int)(10 * Math.random()));
}
- );
+ finally
+ {
+ thereIsALeader.set(false);
+ latch.close();
+ }
+ return null;
+ }
+ });
}
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
@@ -284,7 +278,7 @@ public class TestLeaderLatch extends BaseClassForTests
final int PARTICIPANT_QTY = 10;
final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
final AtomicLong masterCounter = new AtomicLong(0);
- final AtomicLong dunceCounter = new AtomicLong(0);
+ final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
@@ -294,43 +288,40 @@ public class TestLeaderLatch extends BaseClassForTests
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- latch.addListener(
- new LeaderLatchListener()
- {
- boolean beenLeader = false;
+ latch.addListener(new LeaderLatchListener()
+ {
+ boolean beenLeader = false;
- @Override
- public void isLeader()
+ @Override
+ public void isLeader()
+ {
+ if ( !beenLeader )
{
- if ( !beenLeader )
+ masterCounter.incrementAndGet();
+ beenLeader = true;
+ try
{
- masterCounter.incrementAndGet();
- beenLeader = true;
- try
- {
- latch.reset();
- }
- catch ( Exception e )
- {
- throw Throwables.propagate(e);
- }
+ latch.reset();
}
- else
+ catch ( Exception e )
{
- masterCounter.incrementAndGet();
- CloseableUtils.closeQuietly(latch);
- timesSquare.countDown();
+ throw Throwables.propagate(e);
}
}
-
- @Override
- public void notLeader()
+ else
{
- dunceCounter.incrementAndGet();
+ masterCounter.incrementAndGet();
+ CloseableUtils.closeQuietly(latch);
+ timesSquare.countDown();
}
- },
- exec
- );
+ }
+
+ @Override
+ public void notLeader()
+ {
+ notLeaderCounter.incrementAndGet();
+ }
+ }, exec);
latches.add(latch);
}
@@ -346,7 +337,7 @@ public class TestLeaderLatch extends BaseClassForTests
timesSquare.await();
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
- Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY);
+ Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
for ( LeaderLatch latch : latches )
{
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
@@ -373,7 +364,7 @@ public class TestLeaderLatch extends BaseClassForTests
final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
final AtomicLong masterCounter = new AtomicLong(0);
- final AtomicLong dunceCounter = new AtomicLong(0);
+ final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
@@ -382,48 +373,43 @@ public class TestLeaderLatch extends BaseClassForTests
List<LeaderLatch> latches = Lists.newArrayList();
for ( int i = 0; i < PARTICIPANT_QTY; ++i )
{
- LeaderLatch.CloseMode closeMode = i < SILENT_QTY ?
- LeaderLatch.CloseMode.SILENT :
- LeaderLatch.CloseMode.NOTIFY_LEADER;
+ LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
- latch.addListener(
- new LeaderLatchListener()
- {
- boolean beenLeader = false;
+ latch.addListener(new LeaderLatchListener()
+ {
+ boolean beenLeader = false;
- @Override
- public void isLeader()
+ @Override
+ public void isLeader()
+ {
+ if ( !beenLeader )
{
- if ( !beenLeader )
+ masterCounter.incrementAndGet();
+ beenLeader = true;
+ try
{
- masterCounter.incrementAndGet();
- beenLeader = true;
- try
- {
- latch.reset();
- }
- catch ( Exception e )
- {
- throw Throwables.propagate(e);
- }
+ latch.reset();
}
- else
+ catch ( Exception e )
{
- masterCounter.incrementAndGet();
- CloseableUtils.closeQuietly(latch);
- timesSquare.countDown();
+ throw Throwables.propagate(e);
}
}
-
- @Override
- public void notLeader()
+ else
{
- dunceCounter.incrementAndGet();
+ masterCounter.incrementAndGet();
+ CloseableUtils.closeQuietly(latch);
+ timesSquare.countDown();
}
- },
- exec
- );
+ }
+
+ @Override
+ public void notLeader()
+ {
+ notLeaderCounter.incrementAndGet();
+ }
+ }, exec);
latches.add(latch);
}
@@ -439,7 +425,7 @@ public class TestLeaderLatch extends BaseClassForTests
timesSquare.await();
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
- Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
+ Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
for ( LeaderLatch latch : latches )
{
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
@@ -459,15 +445,16 @@ public class TestLeaderLatch extends BaseClassForTests
}
@Test
- public void testCallbackDontNotifyDunce() throws Exception {
+ public void testCallbackDontNotify() throws Exception
+ {
final AtomicLong masterCounter = new AtomicLong(0);
- final AtomicLong dunceCounter = new AtomicLong(0);
+ final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
- final LeaderLatch dunce = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+ final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
leader.addListener(new LeaderLatchListener()
{
@@ -483,7 +470,7 @@ public class TestLeaderLatch extends BaseClassForTests
}
});
- dunce.addListener(new LeaderLatchListener()
+ notifiedLeader.addListener(new LeaderLatchListener()
{
@Override
public void isLeader()
@@ -493,7 +480,7 @@ public class TestLeaderLatch extends BaseClassForTests
@Override
public void notLeader()
{
- dunceCounter.incrementAndGet();
+ notLeaderCounter.incrementAndGet();
}
});
@@ -505,11 +492,11 @@ public class TestLeaderLatch extends BaseClassForTests
timing.sleepABit();
- dunce.start();
+ notifiedLeader.start();
timing.sleepABit();
- dunce.close();
+ notifiedLeader.close();
timing.sleepABit();
@@ -517,20 +504,20 @@ public class TestLeaderLatch extends BaseClassForTests
leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
- Assert.assertEquals(dunce.getState(), LeaderLatch.State.CLOSED);
+ Assert.assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
Assert.assertEquals(masterCounter.get(), 1);
- Assert.assertEquals(dunceCounter.get(), 0);
+ Assert.assertEquals(notLeaderCounter.get(), 0);
}
finally
{
- if (leader.getState() != LeaderLatch.State.CLOSED)
+ if ( leader.getState() != LeaderLatch.State.CLOSED )
{
CloseableUtils.closeQuietly(leader);
}
- if (dunce.getState() != LeaderLatch.State.CLOSED)
+ if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
{
- CloseableUtils.closeQuietly(dunce);
+ CloseableUtils.closeQuietly(notifiedLeader);
}
CloseableUtils.closeQuietly(client);
}
@@ -568,19 +555,16 @@ public class TestLeaderLatch extends BaseClassForTests
ExecutorService service = Executors.newFixedThreadPool(latches.size());
for ( final LeaderLatch latch : latches )
{
- service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- Thread.sleep((int)(100 * Math.random()));
- latch.start();
- return null;
- }
- }
- );
+ service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ Thread.sleep((int)(100 * Math.random()));
+ latch.start();
+ return null;
+ }
+ });
}
service.shutdown();
}