You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:06 UTC
[02/18] git commit: CURATOR-110 - Fixed up formatting to Curator
standards
CURATOR-110 - Fixed up formatting to Curator standards
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0b7ae7e3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0b7ae7e3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0b7ae7e3
Branch: refs/heads/master
Commit: 0b7ae7e3672a9c817fe31144cd6b18d9a357f124
Parents: 1a63a10
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Tue Jun 3 09:01:31 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Tue Jun 3 09:01:31 2014 +1000
----------------------------------------------------------------------
.../framework/state/ConnectionState.java | 35 +-
.../framework/recipes/leader/LeaderLatch.java | 620 +++++++++----------
.../recipes/leader/TestLeaderLatch.java | 491 ++++++++-------
3 files changed, 602 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index b25ce38..7663e10 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -23,17 +23,18 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
/**
* Represents state changes in the connection to ZK
*/
-public enum ConnectionState {
+public enum ConnectionState
+{
/**
- * Sent for the first successful connection to the server. NOTE: You will
- * only get one of these messages for any CuratorFramework instance.
+ * Sent for the first successful connection to the server. NOTE: You will only
+ * get one of these messages for any CuratorFramework instance.
*/
CONNECTED,
/**
* There has been a loss of connection. Leaders, locks, etc. should suspend
- * until the connection is re-established. If the connection times-out you
- * will receive a {@link #LOST} notice
+ * until the connection is re-established. If the connection times-out you will
+ * receive a {@link #LOST} notice
*/
SUSPENDED,
@@ -43,30 +44,28 @@ public enum ConnectionState {
RECONNECTED,
/**
- * The connection is confirmed to be lost. Close any locks, leaders, etc.
- * and attempt to re-create them. NOTE: it is possible to get a
- * {@link #RECONNECTED} state after this but you should still consider any
- * locks, etc. as dirty/unstable
+ * The connection is confirmed to be lost. Close any locks, leaders, etc. and
+ * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
+ * state after this but you should still consider any locks, etc. as dirty/unstable
*/
LOST,
/**
- * The connection has gone into read-only mode. This can only happen if you
- * pass true for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}.
- * See the ZooKeeper doc regarding read only connections: <a
- * href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode"
- * >http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>. The
- * connection will remain in read only mode until another state change is
- * sent.
+ * The connection has gone into read-only mode. This can only happen if you pass true
+ * for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}. See the ZooKeeper doc
+ * regarding read only connections:
+ * <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
+ * The connection will remain in read only mode until another state change is sent.
*/
READ_ONLY;
-
+
/**
* Check if this state indicates that Curator has a connection to ZooKeeper
*
* @return True if connected, false otherwise
*/
- public boolean isConnected() {
+ public boolean isConnected()
+ {
return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index d09ed1b..c1157c9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -19,17 +19,9 @@
package org.apache.curator.framework.recipes.leader;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -46,60 +38,71 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
- * Abstraction to select a "leader" amongst multiple contenders in a group of
- * JMVs connected to a Zookeeper cluster. If a group of N thread/processes
- * contend for leadership one will randomly be assigned leader until it releases
- * leadership at which time another one from the group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
+ * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
+ * randomly be assigned leader until it releases leadership at which time another one from the
+ * group will randomly be chosen
* </p>
*/
-public class LeaderLatch implements Closeable {
+public class LeaderLatch implements Closeable
+{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String latchPath;
private final String id;
- private final AtomicReference<State> state = new AtomicReference<State>(
- State.LATENT);
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;
- private final ConnectionStateListener listener = new ConnectionStateListener() {
+ private final ConnectionStateListener listener = new ConnectionStateListener()
+ {
@Override
- public void stateChanged(CuratorFramework client,
- ConnectionState newState) {
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
handleStateChange(newState);
}
};
private static final String LOCK_NAME = "latch-";
- private static final LockInternalsSorter sorter = new LockInternalsSorter() {
+ private static final LockInternalsSorter sorter = new LockInternalsSorter()
+ {
@Override
- public String fixForSorting(String str, String lockName) {
- return StandardLockInternalsDriver.standardFixForSorting(str,
- lockName);
+ public String fixForSorting(String str, String lockName)
+ {
+ return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
}
};
- public enum State {
- LATENT, STARTED, CLOSED
+ public enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
}
/**
* How to handle listeners when the latch is closed
*/
- public enum CloseMode {
+ public enum CloseMode
+ {
/**
- * When the latch is closed, listeners will *not* be notified (default
- * behavior)
+ * When the latch is closed, listeners will *not* be notified (default behavior)
*/
SILENT,
@@ -110,116 +113,105 @@ public class LeaderLatch implements Closeable {
}
/**
- * @param client
- * the client
- * @param latchPath
- * the path for this leadership group
+ * @param client the client
+ * @param latchPath the path for this leadership group
*/
- public LeaderLatch(CuratorFramework client, String latchPath) {
+ public LeaderLatch(CuratorFramework client, String latchPath)
+ {
this(client, latchPath, "", CloseMode.SILENT);
}
/**
- * @param client
- * the client
- * @param latchPath
- * the path for this leadership group
- * @param id
- * participant ID
+ * @param client the client
+ * @param latchPath the path for this leadership group
+ * @param id participant ID
*/
- public LeaderLatch(CuratorFramework client, String latchPath, String id) {
+ public LeaderLatch(CuratorFramework client, String latchPath, String id)
+ {
this(client, latchPath, id, CloseMode.SILENT);
}
/**
- * @param client
- * the client
- * @param latchPath
- * the path for this leadership group
- * @param id
- * participant ID
- * @param closeMode
- * behaviour of listener on explicit close.
+ * @param client the client
+ * @param latchPath the path for this leadership group
+ * @param id participant ID
+ * @param closeMode behaviour of listener on explicit close.
*/
- public LeaderLatch(CuratorFramework client, String latchPath, String id,
- CloseMode closeMode) {
- this.client = Preconditions.checkNotNull(client,
- "client cannot be null");
- this.latchPath = Preconditions.checkNotNull(latchPath,
- "mutexPath cannot be null");
+ public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
+ {
+ this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
this.id = Preconditions.checkNotNull(id, "id cannot be null");
- this.closeMode = Preconditions.checkNotNull(closeMode,
- "closeMode cannot be null");
+ this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
/**
- * Add this instance to the leadership election and attempt to acquire
- * leadership.
- *
- * @throws Exception
- * errors
+ * Add this instance to the leadership election and attempt to acquire leadership.
+ *
+ * @throws Exception errors
*/
- public void start() throws Exception {
- Preconditions.checkState(
- state.compareAndSet(State.LATENT, State.STARTED),
- "Cannot be started more than once");
+ public void start() throws Exception
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
client.getConnectionStateListenable().addListener(listener);
reset();
}
/**
- * Remove this instance from the leadership election. If this instance is
- * the leader, leadership is released. IMPORTANT: the only way to release
- * leadership is by calling close(). All LeaderLatch instances must
- * eventually be closed.
- *
- * @throws IOException
- * errors
+ * Remove this instance from the leadership election. If this instance is the leader, leadership
+ * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+ * instances must eventually be closed.
+ *
+ * @throws IOException errors
*/
@Override
- public void close() throws IOException {
+ public void close() throws IOException
+ {
close(closeMode);
}
/**
- * Remove this instance from the leadership election. If this instance is
- * the leader, leadership is released. IMPORTANT: the only way to release
- * leadership is by calling close(). All LeaderLatch instances must
- * eventually be closed.
- *
- * @param closeMode
- * allows the default close mode to be overridden at the time the
- * latch is closed.
- *
- * @throws IOException
- * errors
+ * Remove this instance from the leadership election. If this instance is the leader, leadership
+ * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+ * instances must eventually be closed.
+ *
+ * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
+ *
+ * @throws IOException errors
*/
- public void close(CloseMode closeMode) throws IOException {
- Preconditions.checkState(
- state.compareAndSet(State.STARTED, State.CLOSED),
- "Already closed or has not been started");
+ public void close(CloseMode closeMode) throws IOException
+ {
+ Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
- try {
+ try
+ {
setNode(null);
- } catch (Exception e) {
+ }
+ catch ( Exception e )
+ {
throw new IOException(e);
- } finally {
+ }
+ finally
+ {
client.getConnectionStateListenable().removeListener(listener);
- switch (closeMode) {
- case NOTIFY_LEADER: {
- setLeadership(false);
- listeners.clear();
- break;
- }
+ switch ( closeMode )
+ {
+ case NOTIFY_LEADER:
+ {
+ setLeadership(false);
+ listeners.clear();
+ break;
+ }
- default: {
- listeners.clear();
- setLeadership(false);
- break;
- }
+ default:
+ {
+ listeners.clear();
+ setLeadership(false);
+ break;
+ }
}
}
}
@@ -227,160 +219,134 @@ public class LeaderLatch implements Closeable {
/**
* Attaches a listener to this LeaderLatch
* <p>
- * Attaching the same listener multiple times is a noop from the second time
- * on.
+ * Attaching the same listener multiple times is a noop from the second time on.
* <p>
- * All methods for the listener are run using the provided Executor. It is
- * common to pass in a single-threaded executor so that you can be certain
- * that listener methods are called in sequence, but if you are fine with
+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
+ * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
- *
- * @param listener
- * the listener to attach
+ *
+ * @param listener the listener to attach
*/
- public void addListener(LeaderLatchListener listener) {
+ public void addListener(LeaderLatchListener listener)
+ {
listeners.addListener(listener);
}
/**
* Attaches a listener to this LeaderLatch
* <p>
- * Attaching the same listener multiple times is a noop from the second time
- * on.
+ * Attaching the same listener multiple times is a noop from the second time on.
* <p>
- * All methods for the listener are run using the provided Executor. It is
- * common to pass in a single-threaded executor so that you can be certain
- * that listener methods are called in sequence, but if you are fine with
+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
+ * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
- *
- * @param listener
- * the listener to attach
- * @param executor
- * An executor to run the methods for the listener on.
+ *
+ * @param listener the listener to attach
+ * @param executor An executor to run the methods for the listener on.
*/
- public void addListener(LeaderLatchListener listener, Executor executor) {
+ public void addListener(LeaderLatchListener listener, Executor executor)
+ {
listeners.addListener(listener, executor);
}
/**
* Removes a given listener from this LeaderLatch
- *
- * @param listener
- * the listener to remove
+ *
+ * @param listener the listener to remove
*/
- public void removeListener(LeaderLatchListener listener) {
+ public void removeListener(LeaderLatchListener listener)
+ {
listeners.removeListener(listener);
}
/**
- * <p>
- * Causes the current thread to wait until this instance acquires leadership
- * unless the thread is {@linkplain Thread#interrupt interrupted} or
- * {@linkplain #close() closed}.
- * </p>
- * <p>
- * If this instance already is the leader then this method returns
- * immediately.
- * </p>
- *
- * <p>
- * Otherwise the current thread becomes disabled for thread scheduling
- * purposes and lies dormant until one of three things happen:
- * </p>
+ * <p>Causes the current thread to wait until this instance acquires leadership
+ * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
+ * <p>If this instance already is the leader then this method returns immediately.</p>
+ *
+ * <p>Otherwise the current
+ * thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of three things happen:</p>
* <ul>
* <li>This instance becomes the leader</li>
- * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
- * current thread</li>
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- * <p>
- * If the current thread:
- * </p>
+ * <p>If the current thread:</p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
- * <p>
- * then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * </p>
- *
- * @throws InterruptedException
- * if the current thread is interrupted while waiting
- * @throws EOFException
- * if the instance is {@linkplain #close() closed} while waiting
+ * <p>then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.</p>
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
+ * @throws EOFException if the instance is {@linkplain #close() closed}
+ * while waiting
*/
- public void await() throws InterruptedException, EOFException {
- synchronized (this) {
- while ((state.get() == State.STARTED) && !hasLeadership.get()) {
+ public void await() throws InterruptedException, EOFException
+ {
+ synchronized(this)
+ {
+ while ( (state.get() == State.STARTED) && !hasLeadership.get() )
+ {
wait();
}
}
- if (state.get() != State.STARTED) {
+ if ( state.get() != State.STARTED )
+ {
throw new EOFException();
}
}
/**
- * <p>
- * Causes the current thread to wait until this instance acquires leadership
- * unless the thread is {@linkplain Thread#interrupt interrupted}, the
- * specified waiting time elapses or the instance is {@linkplain #close()
- * closed}.
- * </p>
- *
- * <p>
- * If this instance already is the leader then this method returns
- * immediately with the value {@code true}.
- * </p>
- *
- * <p>
- * Otherwise the current thread becomes disabled for thread scheduling
- * purposes and lies dormant until one of four things happen:
- * </p>
+ * <p>Causes the current thread to wait until this instance acquires leadership
+ * unless the thread is {@linkplain Thread#interrupt interrupted},
+ * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
+ *
+ * <p>If this instance already is the leader then this method returns immediately
+ * with the value {@code true}.</p>
+ *
+ * <p>Otherwise the current
+ * thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of four things happen:</p>
* <ul>
* <li>This instance becomes the leader</li>
- * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
- * current thread</li>
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread</li>
* <li>The specified waiting time elapses.</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- *
- * <p>
- * If the current thread:
- * </p>
+ *
+ * <p>If the current thread:</p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
- * <p>
- * then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * </p>
- *
- * <p>
- * If the specified waiting time elapses or the instance is
- * {@linkplain #close() closed} then the value {@code false} is returned. If
- * the time is less than or equal to zero, the method will not wait at all.
- * </p>
- *
- * @param timeout
- * the maximum time to wait
- * @param unit
- * the time unit of the {@code timeout} argument
- * @return {@code true} if the count reached zero and {@code false} if the
- * waiting time elapsed before the count reached zero or the
- * instances was closed
- * @throws InterruptedException
- * if the current thread is interrupted while waiting
+ * <p>then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.</p>
+ *
+ * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
+ * then the value {@code false} is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.</p>
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if the count reached zero and {@code false}
+ * if the waiting time elapsed before the count reached zero or the instances was closed
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
*/
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException {
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException
+ {
long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
- synchronized (this) {
- while ((waitNanos > 0) && (state.get() == State.STARTED)
- && !hasLeadership.get()) {
+ synchronized(this)
+ {
+ while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
+ {
long startNanos = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
long elapsed = System.nanoTime() - startNanos;
@@ -392,23 +358,24 @@ public class LeaderLatch implements Closeable {
/**
* Return this instance's participant Id
- *
+ *
* @return participant Id
*/
- public String getId() {
+ public String getId()
+ {
return id;
}
/**
- * Returns this instances current state, this is the only way to verify that
- * the object has been closed before closing again. If you try to close a
- * latch multiple times, the close() method will throw an
- * IllegalArgumentException which is often not caught and ignored
- * (CloseableUtils.closeQuietly() only looks for IOException).
- *
+ * Returns this instances current state, this is the only way to verify that the object has been closed before
+ * closing again. If you try to close a latch multiple times, the close() method will throw an
+ * IllegalArgumentException which is often not caught and ignored (CloseableUtils.closeQuietly() only looks for
+ * IOException).
+ *
* @return the state of the current instance
*/
- public State getState() {
+ public State getState()
+ {
return state.get();
}
@@ -419,17 +386,16 @@ public class LeaderLatch implements Closeable {
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as
- * hasLeadership uses a local field of the class.
+ * return a value that does not match {@link #hasLeadership()} as hasLeadership
+ * uses a local field of the class.
* </p>
- *
+ *
* @return participants
- * @throws Exception
- * ZK errors, interruptions, etc.
+ * @throws Exception ZK errors, interruptions, etc.
*/
- public Collection<Participant> getParticipants() throws Exception {
- Collection<String> participantNodes = LockInternals
- .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+ public Collection<Participant> getParticipants() throws Exception
+ {
+ Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
return LeaderSelector.getParticipants(client, participantNodes);
}
@@ -441,26 +407,26 @@ public class LeaderLatch implements Closeable {
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as
- * hasLeadership uses a local field of the class.
+ * return a value that does not match {@link #hasLeadership()} as hasLeadership
+ * uses a local field of the class.
* </p>
- *
+ *
* @return leader
- * @throws Exception
- * ZK errors, interruptions, etc.
+ * @throws Exception ZK errors, interruptions, etc.
*/
- public Participant getLeader() throws Exception {
- Collection<String> participantNodes = LockInternals
- .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+ public Participant getLeader() throws Exception
+ {
+ Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
return LeaderSelector.getLeader(client, participantNodes);
}
/**
* Return true if leadership is currently held by this instance
- *
+ *
* @return true/false
*/
- public boolean hasLeadership() {
+ public boolean hasLeadership()
+ {
return (state.get() == State.STARTED) && hasLeadership.get();
}
@@ -468,95 +434,105 @@ public class LeaderLatch implements Closeable {
volatile CountDownLatch debugResetWaitLatch = null;
@VisibleForTesting
- void reset() throws Exception {
+ void reset() throws Exception
+ {
setLeadership(false);
setNode(null);
- BackgroundCallback callback = new BackgroundCallback() {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client,
- CuratorEvent event) throws Exception {
- if (debugResetWaitLatch != null) {
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( debugResetWaitLatch != null )
+ {
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
- if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
setNode(event.getName());
- if (state.get() == State.CLOSED) {
+ if ( state.get() == State.CLOSED )
+ {
setNode(null);
- } else {
+ }
+ else
+ {
getChildren();
}
- } else {
- log.error("getChildren() failed. rc = "
- + event.getResultCode());
+ }
+ else
+ {
+ log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
- client.create()
- .creatingParentsIfNeeded()
- .withProtection()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
- .inBackground(callback)
- .forPath(ZKPaths.makePath(latchPath, LOCK_NAME),
- LeaderSelector.getIdBytes(id));
+ client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
- private void checkLeadership(List<String> children) throws Exception {
+ private void checkLeadership(List<String> children) throws Exception
+ {
final String localOurPath = ourPath.get();
- List<String> sortedChildren = LockInternals.getSortedChildren(
- LOCK_NAME, sorter, children);
- int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths
- .getNodeFromPath(localOurPath)) : -1;
- if (ourIndex < 0) {
+ List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
+ int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+ if ( ourIndex < 0 )
+ {
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
- } else if (ourIndex == 0) {
+ }
+ else if ( ourIndex == 0 )
+ {
setLeadership(true);
- } else {
+ }
+ else
+ {
String watchPath = sortedChildren.get(ourIndex - 1);
- Watcher watcher = new Watcher() {
+ Watcher watcher = new Watcher()
+ {
@Override
- public void process(WatchedEvent event) {
- if ((state.get() == State.STARTED)
- && (event.getType() == Event.EventType.NodeDeleted)
- && (localOurPath != null)) {
- try {
+ public void process(WatchedEvent event)
+ {
+ if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
+ {
+ try
+ {
getChildren();
- } catch (Exception ex) {
- log.error(
- "An error occurred checking the leadership.",
- ex);
+ }
+ catch ( Exception ex )
+ {
+ log.error("An error occurred checking the leadership.", ex);
}
}
}
};
- BackgroundCallback callback = new BackgroundCallback() {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client,
- CuratorEvent event) throws Exception {
- if (event.getResultCode() == KeeperException.Code.NONODE
- .intValue()) {
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+ {
// previous node is gone - reset
reset();
}
}
};
- // use getData() instead of exists() to avoid leaving unneeded
- // watchers which is a type of resource leak
- client.getData().usingWatcher(watcher).inBackground(callback)
- .forPath(ZKPaths.makePath(latchPath, watchPath));
+ // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
+ client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
- private void getChildren() throws Exception {
- BackgroundCallback callback = new BackgroundCallback() {
+ private void getChildren() throws Exception
+ {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client,
- CuratorEvent event) throws Exception {
- if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
checkLeadership(event.getChildren());
}
}
@@ -564,47 +540,69 @@ public class LeaderLatch implements Closeable {
client.getChildren().inBackground(callback).forPath(latchPath);
}
- private void handleStateChange(ConnectionState newState) {
- if (newState.isConnected()) {
- try {
+ private void handleStateChange(ConnectionState newState)
+ {
+ if (newState.isConnected())
+ {
+ try
+ {
reset();
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
log.error("Could not reset leader latch", e);
setLeadership(false);
}
- } else {
+ }
+ else
+ {
setLeadership(false);
}
}
- private synchronized void setLeadership(boolean newValue) {
+ private synchronized void setLeadership(boolean newValue)
+ {
boolean oldValue = hasLeadership.getAndSet(newValue);
- if (oldValue && !newValue) { // Lost leadership, was true, now false
- listeners.forEach(new Function<LeaderLatchListener, Void>() {
- @Override
- public Void apply(LeaderLatchListener listener) {
- listener.notLeader();
- return null;
+ if ( oldValue && !newValue )
+ { // Lost leadership, was true, now false
+ listeners.forEach
+ (
+ new Function<LeaderLatchListener, Void>()
+ {
+ @Override
+ public Void apply(LeaderLatchListener listener)
+ {
+ listener.notLeader();
+ return null;
+ }
}
- });
- } else if (!oldValue && newValue) { // Gained leadership, was false, now
- // true
- listeners.forEach(new Function<LeaderLatchListener, Void>() {
- @Override
- public Void apply(LeaderLatchListener input) {
- input.isLeader();
- return null;
+ );
+ }
+ else if ( !oldValue && newValue )
+ { // Gained leadership, was false, now true
+ listeners.forEach
+ (
+ new Function<LeaderLatchListener, Void>()
+ {
+ @Override
+ public Void apply(LeaderLatchListener input)
+ {
+ input.isLeader();
+ return null;
+ }
}
- });
+ );
}
notifyAll();
}
- private void setNode(String newValue) throws Exception {
+ private void setNode(String newValue) throws Exception
+ {
String oldPath = ourPath.getAndSet(newValue);
- if (oldPath != null) {
+ if ( oldPath != null )
+ {
client.delete().guaranteed().inBackground().forPath(oldPath);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 108f118..d3b00bc 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,17 +19,9 @@
package org.apache.curator.framework.recipes.leader;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
@@ -41,29 +33,36 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class TestLeaderLatch extends BaseClassForTests {
+public class TestLeaderLatch extends BaseClassForTests
+{
private static final String PATH_NAME = "/one/two/me";
private static final int MAX_LOOPS = 5;
@Test
- public void testResetRace() throws Exception {
+ public void testResetRace() throws Exception
+ {
Timing timing = new Timing();
LeaderLatch latch = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
latch = new LeaderLatch(client, PATH_NAME);
latch.debugResetWaitLatch = new CountDownLatch(1);
- latch.start(); // will call reset()
- latch.reset(); // should not result in two nodes
+ latch.start(); // will call reset()
+ latch.reset(); // should not result in two nodes
timing.sleepABit();
@@ -71,21 +70,22 @@ public class TestLeaderLatch extends BaseClassForTests {
timing.sleepABit();
- Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
- 1);
- } finally {
+ Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
+ }
+ finally
+ {
CloseableUtils.closeQuietly(latch);
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testCreateDeleteRace() throws Exception {
+ public void testCreateDeleteRace() throws Exception
+ {
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
@@ -100,40 +100,43 @@ public class TestLeaderLatch extends BaseClassForTests {
timing.sleepABit();
- Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
- 0);
+ Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
- } finally {
+ }
+ finally
+ {
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testLostConnection() throws Exception {
+ public void testLostConnection() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(
- new ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client,
- ConnectionState newState) {
- if (newState == ConnectionState.LOST) {
- countDownLatch.countDown();
- }
- }
- });
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ countDownLatch.countDown();
+ }
+ }
+ });
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
@@ -148,12 +151,13 @@ public class TestLeaderLatch extends BaseClassForTests {
Assert.assertEquals(getLeaders(latches).size(), 0);
- server = new TestingServer(server.getPort(),
- server.getTempDirectory());
- Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should
- // reconnect
- } finally {
- for (LeaderLatch latch : latches) {
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+ Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
@@ -161,20 +165,21 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testCorrectWatching() throws Exception {
+ public void testCorrectWatching() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
final int PARTICIPANT_ID = 2;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
@@ -182,20 +187,20 @@ public class TestLeaderLatch extends BaseClassForTests {
waitForALeader(latches, timing);
- // we need to close a Participant that doesn't be actual leader
- // (first Participant) nor the last
+ //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
latches.get(PARTICIPANT_ID).close();
- // As the previous algorithm assumed that if the watched node is
- // deleted gets the leadership
- // we need to ensure that the PARTICIPANT_ID-1 is not getting
- // (wrongly) elected as leader.
+ //As the previous algorithm assumed that if the watched node is deleted gets the leadership
+ //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
- } finally {
- // removes the already closed participant
+ }
+ finally
+ {
+ //removes the already closed participant
latches.remove(PARTICIPANT_ID);
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
@@ -204,35 +209,37 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testWaiting() throws Exception {
+ public void testWaiting() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
- ExecutorService executorService = Executors
- .newFixedThreadPool(PARTICIPANT_QTY);
- ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(
- executorService);
+ ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
+ ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(executorService);
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
- service.submit(new Callable<Void>() {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ service.submit(new Callable<Void>()
+ {
@Override
- public Void call() throws Exception {
+ public Void call() throws Exception
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- try {
+ try
+ {
latch.start();
- Assert.assertTrue(latch.await(timing.forWaiting()
- .seconds(), TimeUnit.SECONDS));
- Assert.assertTrue(thereIsALeader.compareAndSet(
- false, true));
- Thread.sleep((int) (10 * Math.random()));
- } finally {
+ Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
+ Thread.sleep((int)(10 * Math.random()));
+ }
+ finally
+ {
thereIsALeader.set(false);
latch.close();
}
@@ -241,58 +248,68 @@ public class TestLeaderLatch extends BaseClassForTests {
});
}
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
service.take().get();
}
- } finally {
+ }
+ finally
+ {
executorService.shutdown();
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testBasic() throws Exception {
+ public void testBasic() throws Exception
+ {
basic(Mode.START_IMMEDIATELY);
}
@Test
- public void testBasicAlt() throws Exception {
+ public void testBasicAlt() throws Exception
+ {
basic(Mode.START_IN_THREADS);
}
@Test
- public void testCallbackSanity() throws Exception {
+ public void testCallbackSanity() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- ExecutorService exec = Executors
- .newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("callbackSanity-%s")
- .build());
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
List<LeaderLatch> latches = Lists.newArrayList();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- latch.addListener(new LeaderLatchListener() {
+ latch.addListener(new LeaderLatchListener()
+ {
boolean beenLeader = false;
@Override
- public void isLeader() {
- if (!beenLeader) {
+ public void isLeader()
+ {
+ if ( !beenLeader )
+ {
masterCounter.incrementAndGet();
beenLeader = true;
- try {
+ try
+ {
latch.reset();
- } catch (Exception e) {
+ }
+ catch ( Exception e )
+ {
throw Throwables.propagate(e);
}
- } else {
+ }
+ else
+ {
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
@@ -300,17 +317,20 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
- try {
+ try
+ {
client.start();
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
latch.start();
}
@@ -318,12 +338,17 @@ public class TestLeaderLatch extends BaseClassForTests {
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
- } finally {
- for (LeaderLatch latch : latches) {
- if (latch.getState() != LeaderLatch.State.CLOSED) {
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(latch);
}
}
@@ -332,7 +357,8 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testCallbackNotifyLeader() throws Exception {
+ public void testCallbackNotifyLeader() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
final int SILENT_QTY = 3;
@@ -341,35 +367,37 @@ public class TestLeaderLatch extends BaseClassForTests {
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- ExecutorService exec = Executors
- .newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("callbackNotifyLeader-%s").build());
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
List<LeaderLatch> latches = Lists.newArrayList();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
- LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT
- : LeaderLatch.CloseMode.NOTIFY_LEADER;
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
- final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "",
- closeMode);
- latch.addListener(new LeaderLatchListener() {
+ final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
+ latch.addListener(new LeaderLatchListener()
+ {
boolean beenLeader = false;
@Override
- public void isLeader() {
- if (!beenLeader) {
+ public void isLeader()
+ {
+ if ( !beenLeader )
+ {
masterCounter.incrementAndGet();
beenLeader = true;
- try {
+ try
+ {
latch.reset();
- } catch (Exception e) {
+ }
+ catch ( Exception e )
+ {
throw Throwables.propagate(e);
}
- } else {
+ }
+ else
+ {
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
@@ -377,31 +405,38 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
- try {
+ try
+ {
client.start();
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
latch.start();
}
timesSquare.await();
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
- Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2
- - SILENT_QTY);
- for (LeaderLatch latch : latches) {
+ Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
+ for ( LeaderLatch latch : latches )
+ {
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
- } finally {
- for (LeaderLatch latch : latches) {
- if (latch.getState() != LeaderLatch.State.CLOSED) {
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(latch);
}
}
@@ -410,42 +445,47 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testCallbackDontNotify() throws Exception {
+ public void testCallbackDontNotify() throws Exception
+ {
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
- final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME,
- "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+ final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
- leader.addListener(new LeaderLatchListener() {
+ leader.addListener(new LeaderLatchListener()
+ {
@Override
- public void isLeader() {
+ public void isLeader()
+ {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
masterCounter.incrementAndGet();
}
});
- notifiedLeader.addListener(new LeaderLatchListener() {
+ notifiedLeader.addListener(new LeaderLatchListener()
+ {
@Override
- public void isLeader() {
+ public void isLeader()
+ {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
notLeaderCounter.incrementAndGet();
}
});
- try {
+ try
+ {
client.start();
leader.start();
@@ -464,24 +504,28 @@ public class TestLeaderLatch extends BaseClassForTests {
leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
- Assert.assertEquals(notifiedLeader.getState(),
- LeaderLatch.State.CLOSED);
+ Assert.assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
Assert.assertEquals(masterCounter.get(), 1);
Assert.assertEquals(notLeaderCounter.get(), 0);
- } finally {
- if (leader.getState() != LeaderLatch.State.CLOSED) {
+ }
+ finally
+ {
+ if ( leader.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(leader);
}
- if (notifiedLeader.getState() != LeaderLatch.State.CLOSED) {
+ if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(notifiedLeader);
}
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
- public void testNoServerAtStart() {
+ public void testNoServerAtStart()
+ {
CloseableUtils.closeQuietly(server);
Timing timing = new Timing();
@@ -497,12 +541,14 @@ public class TestLeaderLatch extends BaseClassForTests {
// before starting. Otherwise it's possible that the server will start
// during retries of the inital commands to create the latch zNodes
client.getConnectionStateListenable().addListener(
- new ConnectionStateListener() {
-
+ new ConnectionStateListener()
+ {
@Override
public void stateChanged(CuratorFramework client,
- ConnectionState newState) {
- if (newState == ConnectionState.LOST) {
+ ConnectionState newState)
+ {
+ if (newState == ConnectionState.LOST)
+ {
sessionLostCount.countDown();
}
}
@@ -510,20 +556,23 @@ public class TestLeaderLatch extends BaseClassForTests {
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final CountDownLatch leaderCounter = new CountDownLatch(1);
- leader.addListener(new LeaderLatchListener() {
-
+ leader.addListener(new LeaderLatchListener()
+ {
@Override
- public void isLeader() {
+ public void isLeader()
+ {
leaderCounter.countDown();
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
}
});
- try {
+ try
+ {
leader.start();
timing.awaitLatch(sessionLostCount);
@@ -533,47 +582,58 @@ public class TestLeaderLatch extends BaseClassForTests {
timing.awaitLatch(leaderCounter);
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
Assert.fail("Unexpected exception", e);
- } finally {
+ }
+ finally
+ {
CloseableUtils.closeQuietly(leader);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
- }
+ }
- private enum Mode {
- START_IMMEDIATELY, START_IN_THREADS
+ private enum Mode
+ {
+ START_IMMEDIATELY,
+ START_IN_THREADS
}
- private void basic(Mode mode) throws Exception {
- final int PARTICIPANT_QTY = 1;// 0;
+ private void basic(Mode mode) throws Exception
+ {
+ final int PARTICIPANT_QTY = 1;//0;
List<LeaderLatch> latches = Lists.newArrayList();
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- if (mode == Mode.START_IMMEDIATELY) {
+ if ( mode == Mode.START_IMMEDIATELY )
+ {
latch.start();
}
latches.add(latch);
}
- if (mode == Mode.START_IN_THREADS) {
- ExecutorService service = Executors.newFixedThreadPool(latches
- .size());
- for (final LeaderLatch latch : latches) {
- service.submit(new Callable<Object>() {
+ if ( mode == Mode.START_IN_THREADS )
+ {
+ ExecutorService service = Executors.newFixedThreadPool(latches.size());
+ for ( final LeaderLatch latch : latches )
+ {
+ service.submit(new Callable<Object>()
+ {
@Override
- public Object call() throws Exception {
- Thread.sleep((int) (100 * Math.random()));
+ public Object call() throws Exception
+ {
+ Thread.sleep((int)(100 * Math.random()));
latch.start();
return null;
}
@@ -582,38 +642,36 @@ public class TestLeaderLatch extends BaseClassForTests {
service.shutdown();
}
- while (latches.size() > 0) {
+ while ( latches.size() > 0 )
+ {
List<LeaderLatch> leaders = waitForALeader(latches, timing);
- Assert.assertEquals(leaders.size(), 1); // there can only be one
- // leader
+ Assert.assertEquals(leaders.size(), 1); // there can only be one leader
LeaderLatch theLeader = leaders.get(0);
- if (mode == Mode.START_IMMEDIATELY) {
- Assert.assertEquals(latches.indexOf(theLeader), 0); // assert
- // ordering
- // -
- // leadership
- // should
- // advance
- // in
- // start
- // order
+ if ( mode == Mode.START_IMMEDIATELY )
+ {
+ Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
}
theLeader.close();
latches.remove(theLeader);
}
- } finally {
- for (LeaderLatch latch : latches) {
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
}
}
- private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches,
- Timing timing) throws InterruptedException {
- for (int i = 0; i < MAX_LOOPS; ++i) {
+ private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
+ {
+ for ( int i = 0; i < MAX_LOOPS; ++i )
+ {
List<LeaderLatch> leaders = getLeaders(latches);
- if (leaders.size() != 0) {
+ if ( leaders.size() != 0 )
+ {
return leaders;
}
timing.sleepABit();
@@ -621,10 +679,13 @@ public class TestLeaderLatch extends BaseClassForTests {
return Lists.newArrayList();
}
- private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches) {
+ private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)
+ {
List<LeaderLatch> leaders = Lists.newArrayList();
- for (LeaderLatch latch : latches) {
- if (latch.hasLeadership()) {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.hasLeadership() )
+ {
leaders.add(latch);
}
}