You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/05/10 01:47:16 UTC
[3/3] git commit: CURATOR-22 Add a listener to LeaderLatch
CURATOR-22
Add a listener to LeaderLatch
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/3d6181ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/3d6181ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/3d6181ca
Branch: refs/heads/CURATOR-22
Commit: 3d6181cae984807bd2f89cecc2e5b55d0574a5b3
Parents: 9acf592
Author: randgalt <ra...@apache.org>
Authored: Thu May 9 16:46:50 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Thu May 9 16:46:50 2013 -0700
----------------------------------------------------------------------
.../framework/recipes/leader/LeaderLatch.java | 244 ++++++++++-----
.../recipes/leader/LeaderLatchListener.java | 46 +++
.../framework/recipes/leader/TestLeaderLatch.java | 181 ++++++++---
3 files changed, 351 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 36a0636..508ca7c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.leader;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.recipes.locks.LockInternals;
import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -41,29 +44,31 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
- * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
- * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
- * randomly be assigned leader until it releases leadership at which time another one from the
- * group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
+ * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
+ * randomly be assigned leader until it releases leadership at which time another one from the
+ * group will randomly be chosen
* </p>
*/
public class LeaderLatch implements Closeable
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
- private final String latchPath;
- private final String id;
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
- private final AtomicReference<String> ourPath = new AtomicReference<String>();
-
- private final ConnectionStateListener listener = new ConnectionStateListener()
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final String latchPath;
+ private final String id;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
+ private final AtomicReference<String> ourPath = new AtomicReference<String>();
+ private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+
+ private final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -74,7 +79,7 @@ public class LeaderLatch implements Closeable
private static final String LOCK_NAME = "latch-";
- private static final LockInternalsSorter sorter = new LockInternalsSorter()
+ private static final LockInternalsSorter sorter = new LockInternalsSorter()
{
@Override
public String fixForSorting(String str, String lockName)
@@ -83,7 +88,7 @@ public class LeaderLatch implements Closeable
}
};
- private enum State
+ public enum State
{
LATENT,
STARTED,
@@ -91,7 +96,7 @@ public class LeaderLatch implements Closeable
}
/**
- * @param client the client
+ * @param client the client
* @param latchPath the path for this leadership group
*/
public LeaderLatch(CuratorFramework client, String latchPath)
@@ -100,9 +105,9 @@ public class LeaderLatch implements Closeable
}
/**
- * @param client the client
+ * @param client the client
* @param latchPath the path for this leadership group
- * @param id participant ID
+ * @param id participant ID
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id)
{
@@ -147,16 +152,60 @@ public class LeaderLatch implements Closeable
finally
{
client.getConnectionStateListenable().removeListener(listener);
+ listeners.clear();
setLeadership(false);
}
}
/**
+ * Attaches a listener to this LeaderLatch
+ * <p/>
+ * Attaching the same listener multiple times is a noop from the second time on.
+ * <p/>
+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
+ * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+ * them being called out of order you are welcome to use multiple threads.
+ *
+ * @param listener the listener to attach
+ */
+ public void addListener(LeaderLatchListener listener)
+ {
+ listeners.addListener(listener);
+ }
+
+ /**
+ * Attaches a listener to this LeaderLatch
+ * <p/>
+ * Attaching the same listener multiple times is a noop from the second time on.
+ * <p/>
+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
+ * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+ * them being called out of order you are welcome to use multiple threads.
+ *
+ * @param listener the listener to attach
+ * @param executor An executor to run the methods for the listener on.
+ */
+ public void addListener(LeaderLatchListener listener, Executor executor)
+ {
+ listeners.addListener(listener, executor);
+ }
+
+ /**
+ * Removes a given listener from this LeaderLatch
+ *
+ * @param listener the listener to remove
+ */
+ public void removeListener(LeaderLatchListener listener)
+ {
+ listeners.removeListener(listener);
+ }
+
+ /**
* <p>Causes the current thread to wait until this instance acquires leadership
* unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
- *
+ * <p/>
* <p>If this instance already is the leader then this method returns immediately.</p>
- *
+ * <p/>
* <p>Otherwise the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of three things happen:
@@ -166,7 +215,7 @@ public class LeaderLatch implements Closeable
* the current thread</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul></p>
- *
+ * <p/>
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
@@ -176,9 +225,9 @@ public class LeaderLatch implements Closeable
* interrupted status is cleared.</p>
*
* @throws InterruptedException if the current thread is interrupted
- * while waiting
- * @throws EOFException if the instance is {@linkplain #close() closed}
- * while waiting
+ * while waiting
+ * @throws EOFException if the instance is {@linkplain #close() closed}
+ * while waiting
*/
public void await() throws InterruptedException, EOFException
{
@@ -199,10 +248,10 @@ public class LeaderLatch implements Closeable
* <p>Causes the current thread to wait until this instance acquires leadership
* unless the thread is {@linkplain Thread#interrupt interrupted},
* the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
- *
+ * <p/>
* <p>If this instance already is the leader then this method returns immediately
* with the value {@code true}.</p>
- *
+ * <p/>
* <p>Otherwise the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of four things happen:
@@ -213,7 +262,7 @@ public class LeaderLatch implements Closeable
* <li>The specified waiting time elapses.</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- *
+ * <p/>
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
@@ -221,29 +270,29 @@ public class LeaderLatch implements Closeable
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.</p>
- *
+ * <p/>
* <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
* then the value {@code false} is returned. If the time is less than or equal to zero, the method
* will not wait at all.</p>
*
* @param timeout the maximum time to wait
- * @param unit the time unit of the {@code timeout} argument
+ * @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false}
* if the waiting time elapsed before the count reached zero or the instances was closed
* @throws InterruptedException if the current thread is interrupted
- * while waiting
+ * while waiting
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
{
- long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
+ long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
synchronized(this)
{
while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
{
- long startNanos = System.nanoTime();
+ long startNanos = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
- long elapsed = System.nanoTime() - startNanos;
+ long elapsed = System.nanoTime() - startNanos;
waitNanos -= elapsed;
}
}
@@ -261,14 +310,27 @@ public class LeaderLatch implements Closeable
}
/**
+ * Returns this instances current state, this is the only way to verify that the object has been closed before
+ * closing again. If you try to close a latch multiple times, the close() method will throw an
+ * IllegalArgumentException which is often not caught and ignored (Closeables.closeQuietly() only looks for
+ * IOException).
+ *
+ * @return the state of the current instance
+ */
+ public State getState()
+ {
+ return state.get();
+ }
+
+ /**
* <p>
- * Returns the set of current participants in the leader selection
+ * Returns the set of current participants in the leader selection
* </p>
- *
+ * <p/>
* <p>
- * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as hasLeadership
- * uses a local field of the class.
+ * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+ * return a value that does not match {@link #hasLeadership()} as hasLeadership
+ * uses a local field of the class.
* </p>
*
* @return participants
@@ -282,20 +344,20 @@ public class LeaderLatch implements Closeable
/**
* <p>
- * Return the id for the current leader. If for some reason there is no
- * current leader, a dummy participant is returned.
+ * Return the id for the current leader. If for some reason there is no
+ * current leader, a dummy participant is returned.
* </p>
- *
+ * <p/>
* <p>
- * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as hasLeadership
- * uses a local field of the class.
+ * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+ * return a value that does not match {@link #hasLeadership()} as hasLeadership
+ * uses a local field of the class.
* </p>
*
* @return leader
* @throws Exception ZK errors, interruptions, etc.
*/
- public Participant getLeader() throws Exception
+ public Participant getLeader() throws Exception
{
Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
return LeaderSelector.getLeader(client, participantNodes);
@@ -320,7 +382,7 @@ public class LeaderLatch implements Closeable
setLeadership(false);
setNode(null);
- BackgroundCallback callback = new BackgroundCallback()
+ BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -347,9 +409,9 @@ public class LeaderLatch implements Closeable
private void checkLeadership(List<String> children) throws Exception
{
- final String localOurPath = ourPath.get();
- List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
- int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+ final String localOurPath = ourPath.get();
+ List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
+ int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
@@ -361,7 +423,7 @@ public class LeaderLatch implements Closeable
}
else
{
- String watchPath = sortedChildren.get(ourIndex - 1);
+ String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@Override
@@ -373,7 +435,7 @@ public class LeaderLatch implements Closeable
{
getChildren();
}
- catch(Exception ex)
+ catch ( Exception ex )
{
log.error("An error occurred checking the leadership.", ex);
}
@@ -381,7 +443,7 @@ public class LeaderLatch implements Closeable
}
};
- BackgroundCallback callback = new BackgroundCallback()
+ BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -399,7 +461,7 @@ public class LeaderLatch implements Closeable
private void getChildren() throws Exception
{
- BackgroundCallback callback = new BackgroundCallback()
+ BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -417,44 +479,76 @@ public class LeaderLatch implements Closeable
{
switch ( newState )
{
- default:
- {
- // NOP
- break;
- }
+ default:
+ {
+ // NOP
+ break;
+ }
- case RECONNECTED:
+ case RECONNECTED:
+ {
+ try
{
- try
- {
- reset();
- }
- catch ( Exception e )
- {
- log.error("Could not reset leader latch", e);
- setLeadership(false);
- }
- break;
+ reset();
}
-
- case SUSPENDED:
- case LOST:
+ catch ( Exception e )
{
+ log.error("Could not reset leader latch", e);
setLeadership(false);
- break;
}
+ break;
+ }
+
+ case SUSPENDED:
+ case LOST:
+ {
+ setLeadership(false);
+ break;
+ }
}
}
private synchronized void setLeadership(boolean newValue)
{
- hasLeadership.set(newValue);
+ boolean oldValue = hasLeadership.getAndSet(newValue);
+
+ if ( oldValue && !newValue )
+ { // Lost leadership, was true, now false
+ listeners.forEach
+ (
+ new Function<LeaderLatchListener, Void>()
+ {
+ @Override
+ public Void apply(LeaderLatchListener listener)
+ {
+ listener.notLeader();
+ return null;
+ }
+ }
+ );
+ }
+ else if ( !oldValue && newValue )
+ { // Gained leadership, was false, now true
+ listeners.forEach
+ (
+ new Function<LeaderLatchListener, Void>()
+ {
+ @Override
+ public Void apply(LeaderLatchListener input)
+ {
+ input.isLeader();
+ return null;
+ }
+ }
+ );
+ }
+
notifyAll();
}
private void setNode(String newValue) throws Exception
{
- String oldPath = ourPath.getAndSet(newValue);
+ String oldPath = ourPath.getAndSet(newValue);
if ( oldPath != null )
{
client.delete().guaranteed().inBackground().forPath(oldPath);
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
new file mode 100644
index 0000000..68dd355
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.leader;
+
+/**
+ * A LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed.
+ *
+ * Note that just because you are in the middle of one of these method calls, it does not necessarily mean that
+ * hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes
+ * before these methods get called. The contract is that if that happens, you should see another call to the other
+ * method pretty quickly.
+ */
+public interface LeaderLatchListener
+{
+ /**
+ * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
+ *
+ * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
+ * this occurs, you can expect {@link #notLeader()} to also be called.
+ */
+ public void isLeader();
+
+ /**
+ * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
+ *
+ * Note that it is possible that by the time this method call happens, hasLeadership has become true. If
+ * this occurs, you can expect {@link #isLeader()} to also be called.
+ */
+ public void notLeader();
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 307c383..58a61ee 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -16,10 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.leader;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -39,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
public class TestLeaderLatch extends BaseClassForTests
{
@@ -88,7 +92,7 @@ public class TestLeaderLatch extends BaseClassForTests
{
client.start();
- final CountDownLatch countDownLatch = new CountDownLatch(1);
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener
(
new ConnectionStateListener()
@@ -136,47 +140,47 @@ public class TestLeaderLatch extends BaseClassForTests
@Test
public void testCorrectWatching() throws Exception
{
- final int PARTICIPANT_QTY = 10;
- final int PARTICIPANT_ID = 2;
-
- List<LeaderLatch> latches = Lists.newArrayList();
+ final int PARTICIPANT_QTY = 10;
+ final int PARTICIPANT_ID = 2;
+
+ List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
- client.start();
-
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
- LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- latch.start();
- latches.add(latch);
- }
-
- waitForALeader(latches, timing);
-
- //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
- latches.get(PARTICIPANT_ID).close();
-
- //As the previous algorithm assumed that if the watched node is deleted gets the leadership
- //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
- Assert.assertTrue(!latches.get(PARTICIPANT_ID-1).hasLeadership());
- }
- finally
- {
- //removes the already closed participant
- latches.remove(PARTICIPANT_ID);
-
- for ( LeaderLatch latch : latches )
- {
- Closeables.closeQuietly(latch);
- }
- Closeables.closeQuietly(client);
- }
+ client.start();
+
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+ latch.start();
+ latches.add(latch);
+ }
+
+ waitForALeader(latches, timing);
+
+ //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
+ latches.get(PARTICIPANT_ID).close();
+
+ //As the previous algorithm assumed that if the watched node is deleted gets the leadership
+ //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
+ Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
+ }
+ finally
+ {
+ //removes the already closed participant
+ latches.remove(PARTICIPANT_ID);
+
+ for ( LeaderLatch latch : latches )
+ {
+ Closeables.closeQuietly(latch);
+ }
+ Closeables.closeQuietly(client);
+ }
}
-
+
@Test
public void testWaiting() throws Exception
{
@@ -244,6 +248,93 @@ public class TestLeaderLatch extends BaseClassForTests
basic(Mode.START_IN_THREADS);
}
+ @Test
+ public void testCallbackSanity() throws Exception
+ {
+ final int PARTICIPANT_QTY = 10;
+ final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
+ final AtomicLong masterCounter = new AtomicLong(0);
+ final AtomicLong dunceCounter = new AtomicLong(0);
+
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
+
+ List<LeaderLatch> latches = Lists.newArrayList();
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+ latch.addListener(
+ new LeaderLatchListener()
+ {
+ boolean beenLeader = false;
+
+ @Override
+ public void isLeader()
+ {
+ if ( !beenLeader )
+ {
+ masterCounter.incrementAndGet();
+ beenLeader = true;
+ try
+ {
+ latch.reset();
+ }
+ catch ( Exception e )
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+ else
+ {
+ masterCounter.incrementAndGet();
+ Closeables.closeQuietly(latch);
+ timesSquare.countDown();
+ }
+ }
+
+ @Override
+ public void notLeader()
+ {
+ dunceCounter.incrementAndGet();
+ }
+ },
+ exec
+ );
+ latches.add(latch);
+ }
+
+ try
+ {
+ client.start();
+
+ for ( LeaderLatch latch : latches )
+ {
+ latch.start();
+ }
+
+ timesSquare.await();
+
+ Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
+ Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY);
+ for ( LeaderLatch latch : latches )
+ {
+ Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
+ }
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.getState() != LeaderLatch.State.CLOSED )
+ {
+ Closeables.closeQuietly(latch);
+ }
+ }
+ Closeables.closeQuietly(client);
+ }
+ }
+
private enum Mode
{
START_IMMEDIATELY,
@@ -277,18 +368,18 @@ public class TestLeaderLatch extends BaseClassForTests
for ( final LeaderLatch latch : latches )
{
service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ (
+ new Callable<Object>()
{
- Thread.sleep((int)(100 * Math.random()));
- latch.start();
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ Thread.sleep((int)(100 * Math.random()));
+ latch.start();
+ return null;
+ }
}
- }
- );
+ );
}
service.shutdown();
}