You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:05 UTC
[01/18] git commit: CURATOR-110 - Modified state handling to treat
'CONNECTED' and 'RECONNECTED' events in the same way. Added a test case for
the leader latch being started before a connection to ZK has been
established.
Repository: curator
Updated Branches:
refs/heads/master 6e98562d8 -> 5d7d0c7f1
CURATOR-110 - Modified state handling to treat 'CONNECTED' and
'RECONNECTED' events in the same way. Added a test case for the leader
latch being started before a connection to ZK has been established.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a63a102
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a63a102
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a63a102
Branch: refs/heads/master
Commit: 1a63a102ebbaaead265babd71a3d52928f848bd0
Parents: de1d38c
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Mon Jun 2 16:30:26 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Mon Jun 2 16:30:26 2014 +1000
----------------------------------------------------------------------
.../framework/state/ConnectionState.java | 41 +-
.../framework/recipes/leader/LeaderLatch.java | 633 +++++++++----------
.../recipes/leader/TestLeaderLatch.java | 516 +++++++--------
3 files changed, 601 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 566d355..b25ce38 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -23,18 +23,17 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
/**
* Represents state changes in the connection to ZK
*/
-public enum ConnectionState
-{
+public enum ConnectionState {
/**
- * Sent for the first successful connection to the server. NOTE: You will only
- * get one of these messages for any CuratorFramework instance.
+ * Sent for the first successful connection to the server. NOTE: You will
+ * only get one of these messages for any CuratorFramework instance.
*/
CONNECTED,
/**
* There has been a loss of connection. Leaders, locks, etc. should suspend
- * until the connection is re-established. If the connection times-out you will
- * receive a {@link #LOST} notice
+ * until the connection is re-established. If the connection times-out you
+ * will receive a {@link #LOST} notice
*/
SUSPENDED,
@@ -44,18 +43,30 @@ public enum ConnectionState
RECONNECTED,
/**
- * The connection is confirmed to be lost. Close any locks, leaders, etc. and
- * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
- * state after this but you should still consider any locks, etc. as dirty/unstable
+ * The connection is confirmed to be lost. Close any locks, leaders, etc.
+ * and attempt to re-create them. NOTE: it is possible to get a
+ * {@link #RECONNECTED} state after this but you should still consider any
+ * locks, etc. as dirty/unstable
*/
LOST,
/**
- * The connection has gone into read-only mode. This can only happen if you pass true
- * for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}. See the ZooKeeper doc
- * regarding read only connections:
- * <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
- * The connection will remain in read only mode until another state change is sent.
+ * The connection has gone into read-only mode. This can only happen if you
+ * pass true for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}.
+ * See the ZooKeeper doc regarding read only connections: <a
+ * href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode"
+ * >http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>. The
+ * connection will remain in read only mode until another state change is
+ * sent.
*/
- READ_ONLY
+ READ_ONLY;
+
+ /**
+ * Check if this state indicates that Curator has a connection to ZooKeeper
+ *
+ * @return True if connected, false otherwise
+ */
+ public boolean isConnected() {
+ return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 88456af..d09ed1b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -19,9 +19,17 @@
package org.apache.curator.framework.recipes.leader;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -38,71 +46,60 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
/**
* <p>
- * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
- * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
- * randomly be assigned leader until it releases leadership at which time another one from the
- * group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of
+ * JMVs connected to a Zookeeper cluster. If a group of N thread/processes
+ * contend for leadership one will randomly be assigned leader until it releases
+ * leadership at which time another one from the group will randomly be chosen
* </p>
*/
-public class LeaderLatch implements Closeable
-{
+public class LeaderLatch implements Closeable {
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String latchPath;
private final String id;
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicReference<State> state = new AtomicReference<State>(
+ State.LATENT);
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;
- private final ConnectionStateListener listener = new ConnectionStateListener()
- {
+ private final ConnectionStateListener listener = new ConnectionStateListener() {
@Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
+ public void stateChanged(CuratorFramework client,
+ ConnectionState newState) {
handleStateChange(newState);
}
};
private static final String LOCK_NAME = "latch-";
- private static final LockInternalsSorter sorter = new LockInternalsSorter()
- {
+ private static final LockInternalsSorter sorter = new LockInternalsSorter() {
@Override
- public String fixForSorting(String str, String lockName)
- {
- return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
+ public String fixForSorting(String str, String lockName) {
+ return StandardLockInternalsDriver.standardFixForSorting(str,
+ lockName);
}
};
- public enum State
- {
- LATENT,
- STARTED,
- CLOSED
+ public enum State {
+ LATENT, STARTED, CLOSED
}
/**
* How to handle listeners when the latch is closed
*/
- public enum CloseMode
- {
+ public enum CloseMode {
/**
- * When the latch is closed, listeners will *not* be notified (default behavior)
+ * When the latch is closed, listeners will *not* be notified (default
+ * behavior)
*/
SILENT,
@@ -113,105 +110,116 @@ public class LeaderLatch implements Closeable
}
/**
- * @param client the client
- * @param latchPath the path for this leadership group
+ * @param client
+ * the client
+ * @param latchPath
+ * the path for this leadership group
*/
- public LeaderLatch(CuratorFramework client, String latchPath)
- {
+ public LeaderLatch(CuratorFramework client, String latchPath) {
this(client, latchPath, "", CloseMode.SILENT);
}
/**
- * @param client the client
- * @param latchPath the path for this leadership group
- * @param id participant ID
+ * @param client
+ * the client
+ * @param latchPath
+ * the path for this leadership group
+ * @param id
+ * participant ID
*/
- public LeaderLatch(CuratorFramework client, String latchPath, String id)
- {
+ public LeaderLatch(CuratorFramework client, String latchPath, String id) {
this(client, latchPath, id, CloseMode.SILENT);
}
/**
- * @param client the client
- * @param latchPath the path for this leadership group
- * @param id participant ID
- * @param closeMode behaviour of listener on explicit close.
+ * @param client
+ * the client
+ * @param latchPath
+ * the path for this leadership group
+ * @param id
+ * participant ID
+ * @param closeMode
+ * behaviour of listener on explicit close.
*/
- public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
- {
- this.client = Preconditions.checkNotNull(client, "client cannot be null");
- this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
+ public LeaderLatch(CuratorFramework client, String latchPath, String id,
+ CloseMode closeMode) {
+ this.client = Preconditions.checkNotNull(client,
+ "client cannot be null");
+ this.latchPath = Preconditions.checkNotNull(latchPath,
+ "mutexPath cannot be null");
this.id = Preconditions.checkNotNull(id, "id cannot be null");
- this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
+ this.closeMode = Preconditions.checkNotNull(closeMode,
+ "closeMode cannot be null");
}
/**
- * Add this instance to the leadership election and attempt to acquire leadership.
- *
- * @throws Exception errors
+ * Add this instance to the leadership election and attempt to acquire
+ * leadership.
+ *
+ * @throws Exception
+ * errors
*/
- public void start() throws Exception
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+ public void start() throws Exception {
+ Preconditions.checkState(
+ state.compareAndSet(State.LATENT, State.STARTED),
+ "Cannot be started more than once");
client.getConnectionStateListenable().addListener(listener);
reset();
}
/**
- * Remove this instance from the leadership election. If this instance is the leader, leadership
- * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
- * instances must eventually be closed.
- *
- * @throws IOException errors
+ * Remove this instance from the leadership election. If this instance is
+ * the leader, leadership is released. IMPORTANT: the only way to release
+ * leadership is by calling close(). All LeaderLatch instances must
+ * eventually be closed.
+ *
+ * @throws IOException
+ * errors
*/
@Override
- public void close() throws IOException
- {
+ public void close() throws IOException {
close(closeMode);
}
/**
- * Remove this instance from the leadership election. If this instance is the leader, leadership
- * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
- * instances must eventually be closed.
- *
- * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
- *
- * @throws IOException errors
+ * Remove this instance from the leadership election. If this instance is
+ * the leader, leadership is released. IMPORTANT: the only way to release
+ * leadership is by calling close(). All LeaderLatch instances must
+ * eventually be closed.
+ *
+ * @param closeMode
+ * allows the default close mode to be overridden at the time the
+ * latch is closed.
+ *
+ * @throws IOException
+ * errors
*/
- public void close(CloseMode closeMode) throws IOException
- {
- Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+ public void close(CloseMode closeMode) throws IOException {
+ Preconditions.checkState(
+ state.compareAndSet(State.STARTED, State.CLOSED),
+ "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
- try
- {
+ try {
setNode(null);
- }
- catch ( Exception e )
- {
+ } catch (Exception e) {
throw new IOException(e);
- }
- finally
- {
+ } finally {
client.getConnectionStateListenable().removeListener(listener);
- switch ( closeMode )
- {
- case NOTIFY_LEADER:
- {
- setLeadership(false);
- listeners.clear();
- break;
- }
+ switch (closeMode) {
+ case NOTIFY_LEADER: {
+ setLeadership(false);
+ listeners.clear();
+ break;
+ }
- default:
- {
- listeners.clear();
- setLeadership(false);
- break;
- }
+ default: {
+ listeners.clear();
+ setLeadership(false);
+ break;
+ }
}
}
}
@@ -219,134 +227,160 @@ public class LeaderLatch implements Closeable
/**
* Attaches a listener to this LeaderLatch
* <p>
- * Attaching the same listener multiple times is a noop from the second time on.
+ * Attaching the same listener multiple times is a noop from the second time
+ * on.
* <p>
- * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
- * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+ * All methods for the listener are run using the provided Executor. It is
+ * common to pass in a single-threaded executor so that you can be certain
+ * that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
- *
- * @param listener the listener to attach
+ *
+ * @param listener
+ * the listener to attach
*/
- public void addListener(LeaderLatchListener listener)
- {
+ public void addListener(LeaderLatchListener listener) {
listeners.addListener(listener);
}
/**
* Attaches a listener to this LeaderLatch
* <p>
- * Attaching the same listener multiple times is a noop from the second time on.
+ * Attaching the same listener multiple times is a noop from the second time
+ * on.
* <p>
- * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
- * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+ * All methods for the listener are run using the provided Executor. It is
+ * common to pass in a single-threaded executor so that you can be certain
+ * that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
- *
- * @param listener the listener to attach
- * @param executor An executor to run the methods for the listener on.
+ *
+ * @param listener
+ * the listener to attach
+ * @param executor
+ * An executor to run the methods for the listener on.
*/
- public void addListener(LeaderLatchListener listener, Executor executor)
- {
+ public void addListener(LeaderLatchListener listener, Executor executor) {
listeners.addListener(listener, executor);
}
/**
* Removes a given listener from this LeaderLatch
- *
- * @param listener the listener to remove
+ *
+ * @param listener
+ * the listener to remove
*/
- public void removeListener(LeaderLatchListener listener)
- {
+ public void removeListener(LeaderLatchListener listener) {
listeners.removeListener(listener);
}
/**
- * <p>Causes the current thread to wait until this instance acquires leadership
- * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
- * <p>If this instance already is the leader then this method returns immediately.</p>
- *
- * <p>Otherwise the current
- * thread becomes disabled for thread scheduling purposes and lies
- * dormant until one of three things happen:</p>
+ * <p>
+ * Causes the current thread to wait until this instance acquires leadership
+ * unless the thread is {@linkplain Thread#interrupt interrupted} or
+ * {@linkplain #close() closed}.
+ * </p>
+ * <p>
+ * If this instance already is the leader then this method returns
+ * immediately.
+ * </p>
+ *
+ * <p>
+ * Otherwise the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until one of three things happen:
+ * </p>
* <ul>
* <li>This instance becomes the leader</li>
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread</li>
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+ * current thread</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- * <p>If the current thread:</p>
+ * <p>
+ * If the current thread:
+ * </p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
- * <p>then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.</p>
- *
- * @throws InterruptedException if the current thread is interrupted
- * while waiting
- * @throws EOFException if the instance is {@linkplain #close() closed}
- * while waiting
+ * <p>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ * </p>
+ *
+ * @throws InterruptedException
+ * if the current thread is interrupted while waiting
+ * @throws EOFException
+ * if the instance is {@linkplain #close() closed} while waiting
*/
- public void await() throws InterruptedException, EOFException
- {
- synchronized(this)
- {
- while ( (state.get() == State.STARTED) && !hasLeadership.get() )
- {
+ public void await() throws InterruptedException, EOFException {
+ synchronized (this) {
+ while ((state.get() == State.STARTED) && !hasLeadership.get()) {
wait();
}
}
- if ( state.get() != State.STARTED )
- {
+ if (state.get() != State.STARTED) {
throw new EOFException();
}
}
/**
- * <p>Causes the current thread to wait until this instance acquires leadership
- * unless the thread is {@linkplain Thread#interrupt interrupted},
- * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
- *
- * <p>If this instance already is the leader then this method returns immediately
- * with the value {@code true}.</p>
- *
- * <p>Otherwise the current
- * thread becomes disabled for thread scheduling purposes and lies
- * dormant until one of four things happen:</p>
+ * <p>
+ * Causes the current thread to wait until this instance acquires leadership
+ * unless the thread is {@linkplain Thread#interrupt interrupted}, the
+ * specified waiting time elapses or the instance is {@linkplain #close()
+ * closed}.
+ * </p>
+ *
+ * <p>
+ * If this instance already is the leader then this method returns
+ * immediately with the value {@code true}.
+ * </p>
+ *
+ * <p>
+ * Otherwise the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until one of four things happen:
+ * </p>
* <ul>
* <li>This instance becomes the leader</li>
- * <li>Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread</li>
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+ * current thread</li>
* <li>The specified waiting time elapses.</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- *
- * <p>If the current thread:</p>
+ *
+ * <p>
+ * If the current thread:
+ * </p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
- * <p>then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.</p>
- *
- * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
- * then the value {@code false} is returned. If the time is less than or equal to zero, the method
- * will not wait at all.</p>
- *
- * @param timeout the maximum time to wait
- * @param unit the time unit of the {@code timeout} argument
- * @return {@code true} if the count reached zero and {@code false}
- * if the waiting time elapsed before the count reached zero or the instances was closed
- * @throws InterruptedException if the current thread is interrupted
- * while waiting
+ * <p>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ * </p>
+ *
+ * <p>
+ * If the specified waiting time elapses or the instance is
+ * {@linkplain #close() closed} then the value {@code false} is returned. If
+ * the time is less than or equal to zero, the method will not wait at all.
+ * </p>
+ *
+ * @param timeout
+ * the maximum time to wait
+ * @param unit
+ * the time unit of the {@code timeout} argument
+ * @return {@code true} if the count reached zero and {@code false} if the
+ * waiting time elapsed before the count reached zero or the
+ * instances was closed
+ * @throws InterruptedException
+ * if the current thread is interrupted while waiting
*/
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException
- {
+ public boolean await(long timeout, TimeUnit unit)
+ throws InterruptedException {
long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
- synchronized(this)
- {
- while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
- {
+ synchronized (this) {
+ while ((waitNanos > 0) && (state.get() == State.STARTED)
+ && !hasLeadership.get()) {
long startNanos = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
long elapsed = System.nanoTime() - startNanos;
@@ -358,24 +392,23 @@ public class LeaderLatch implements Closeable
/**
* Return this instance's participant Id
- *
+ *
* @return participant Id
*/
- public String getId()
- {
+ public String getId() {
return id;
}
/**
- * Returns this instances current state, this is the only way to verify that the object has been closed before
- * closing again. If you try to close a latch multiple times, the close() method will throw an
- * IllegalArgumentException which is often not caught and ignored (CloseableUtils.closeQuietly() only looks for
- * IOException).
- *
+ * Returns this instances current state, this is the only way to verify that
+ * the object has been closed before closing again. If you try to close a
+ * latch multiple times, the close() method will throw an
+ * IllegalArgumentException which is often not caught and ignored
+ * (CloseableUtils.closeQuietly() only looks for IOException).
+ *
* @return the state of the current instance
*/
- public State getState()
- {
+ public State getState() {
return state.get();
}
@@ -386,16 +419,17 @@ public class LeaderLatch implements Closeable
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as hasLeadership
- * uses a local field of the class.
+ * return a value that does not match {@link #hasLeadership()} as
+ * hasLeadership uses a local field of the class.
* </p>
- *
+ *
* @return participants
- * @throws Exception ZK errors, interruptions, etc.
+ * @throws Exception
+ * ZK errors, interruptions, etc.
*/
- public Collection<Participant> getParticipants() throws Exception
- {
- Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+ public Collection<Participant> getParticipants() throws Exception {
+ Collection<String> participantNodes = LockInternals
+ .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
return LeaderSelector.getParticipants(client, participantNodes);
}
@@ -407,26 +441,26 @@ public class LeaderLatch implements Closeable
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as hasLeadership
- * uses a local field of the class.
+ * return a value that does not match {@link #hasLeadership()} as
+ * hasLeadership uses a local field of the class.
* </p>
- *
+ *
* @return leader
- * @throws Exception ZK errors, interruptions, etc.
+ * @throws Exception
+ * ZK errors, interruptions, etc.
*/
- public Participant getLeader() throws Exception
- {
- Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+ public Participant getLeader() throws Exception {
+ Collection<String> participantNodes = LockInternals
+ .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
return LeaderSelector.getLeader(client, participantNodes);
}
/**
* Return true if leadership is currently held by this instance
- *
+ *
* @return true/false
*/
- public boolean hasLeadership()
- {
+ public boolean hasLeadership() {
return (state.get() == State.STARTED) && hasLeadership.get();
}
@@ -434,105 +468,95 @@ public class LeaderLatch implements Closeable
volatile CountDownLatch debugResetWaitLatch = null;
@VisibleForTesting
- void reset() throws Exception
- {
+ void reset() throws Exception {
setLeadership(false);
setNode(null);
- BackgroundCallback callback = new BackgroundCallback()
- {
+ BackgroundCallback callback = new BackgroundCallback() {
@Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
- if ( debugResetWaitLatch != null )
- {
+ public void processResult(CuratorFramework client,
+ CuratorEvent event) throws Exception {
+ if (debugResetWaitLatch != null) {
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
- if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
- {
+ if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
setNode(event.getName());
- if ( state.get() == State.CLOSED )
- {
+ if (state.get() == State.CLOSED) {
setNode(null);
- }
- else
- {
+ } else {
getChildren();
}
- }
- else
- {
- log.error("getChildren() failed. rc = " + event.getResultCode());
+ } else {
+ log.error("getChildren() failed. rc = "
+ + event.getResultCode());
}
}
};
- client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
+ client.create()
+ .creatingParentsIfNeeded()
+ .withProtection()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+ .inBackground(callback)
+ .forPath(ZKPaths.makePath(latchPath, LOCK_NAME),
+ LeaderSelector.getIdBytes(id));
}
- private void checkLeadership(List<String> children) throws Exception
- {
+ private void checkLeadership(List<String> children) throws Exception {
final String localOurPath = ourPath.get();
- List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
- int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
- if ( ourIndex < 0 )
- {
+ List<String> sortedChildren = LockInternals.getSortedChildren(
+ LOCK_NAME, sorter, children);
+ int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths
+ .getNodeFromPath(localOurPath)) : -1;
+ if (ourIndex < 0) {
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
- }
- else if ( ourIndex == 0 )
- {
+ } else if (ourIndex == 0) {
setLeadership(true);
- }
- else
- {
+ } else {
String watchPath = sortedChildren.get(ourIndex - 1);
- Watcher watcher = new Watcher()
- {
+ Watcher watcher = new Watcher() {
@Override
- public void process(WatchedEvent event)
- {
- if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
- {
- try
- {
+ public void process(WatchedEvent event) {
+ if ((state.get() == State.STARTED)
+ && (event.getType() == Event.EventType.NodeDeleted)
+ && (localOurPath != null)) {
+ try {
getChildren();
- }
- catch ( Exception ex )
- {
- log.error("An error occurred checking the leadership.", ex);
+ } catch (Exception ex) {
+ log.error(
+ "An error occurred checking the leadership.",
+ ex);
}
}
}
};
- BackgroundCallback callback = new BackgroundCallback()
- {
+ BackgroundCallback callback = new BackgroundCallback() {
@Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
- if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
- {
+ public void processResult(CuratorFramework client,
+ CuratorEvent event) throws Exception {
+ if (event.getResultCode() == KeeperException.Code.NONODE
+ .intValue()) {
// previous node is gone - reset
reset();
}
}
};
- // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
- client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
+ // use getData() instead of exists() to avoid leaving unneeded
+ // watchers which is a type of resource leak
+ client.getData().usingWatcher(watcher).inBackground(callback)
+ .forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
- private void getChildren() throws Exception
- {
- BackgroundCallback callback = new BackgroundCallback()
- {
+ private void getChildren() throws Exception {
+ BackgroundCallback callback = new BackgroundCallback() {
@Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
- if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
- {
+ public void processResult(CuratorFramework client,
+ CuratorEvent event) throws Exception {
+ if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
checkLeadership(event.getChildren());
}
}
@@ -540,82 +564,47 @@ public class LeaderLatch implements Closeable
client.getChildren().inBackground(callback).forPath(latchPath);
}
- private void handleStateChange(ConnectionState newState)
- {
- switch ( newState )
- {
- default:
- {
- // NOP
- break;
- }
-
- case RECONNECTED:
- {
- try
- {
+ private void handleStateChange(ConnectionState newState) {
+ if (newState.isConnected()) {
+ try {
reset();
- }
- catch ( Exception e )
- {
+ } catch (Exception e) {
log.error("Could not reset leader latch", e);
setLeadership(false);
}
- break;
- }
-
- case SUSPENDED:
- case LOST:
- {
+ } else {
setLeadership(false);
- break;
- }
}
}
- private synchronized void setLeadership(boolean newValue)
- {
+ private synchronized void setLeadership(boolean newValue) {
boolean oldValue = hasLeadership.getAndSet(newValue);
- if ( oldValue && !newValue )
- { // Lost leadership, was true, now false
- listeners.forEach
- (
- new Function<LeaderLatchListener, Void>()
- {
- @Override
- public Void apply(LeaderLatchListener listener)
- {
- listener.notLeader();
- return null;
- }
+ if (oldValue && !newValue) { // Lost leadership, was true, now false
+ listeners.forEach(new Function<LeaderLatchListener, Void>() {
+ @Override
+ public Void apply(LeaderLatchListener listener) {
+ listener.notLeader();
+ return null;
}
- );
- }
- else if ( !oldValue && newValue )
- { // Gained leadership, was false, now true
- listeners.forEach
- (
- new Function<LeaderLatchListener, Void>()
- {
- @Override
- public Void apply(LeaderLatchListener input)
- {
- input.isLeader();
- return null;
- }
+ });
+ } else if (!oldValue && newValue) { // Gained leadership, was false, now
+ // true
+ listeners.forEach(new Function<LeaderLatchListener, Void>() {
+ @Override
+ public Void apply(LeaderLatchListener input) {
+ input.isLeader();
+ return null;
}
- );
+ });
}
notifyAll();
}
- private void setNode(String newValue) throws Exception
- {
+ private void setNode(String newValue) throws Exception {
String oldPath = ourPath.getAndSet(newValue);
- if ( oldPath != null )
- {
+ if (oldPath != null) {
client.delete().guaranteed().inBackground().forPath(oldPath);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b827e15..108f118 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,9 +19,17 @@
package org.apache.curator.framework.recipes.leader;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
@@ -33,36 +41,29 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-public class TestLeaderLatch extends BaseClassForTests
-{
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TestLeaderLatch extends BaseClassForTests {
private static final String PATH_NAME = "/one/two/me";
private static final int MAX_LOOPS = 5;
@Test
- public void testResetRace() throws Exception
- {
+ public void testResetRace() throws Exception {
Timing timing = new Timing();
LeaderLatch latch = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ try {
client.start();
latch = new LeaderLatch(client, PATH_NAME);
latch.debugResetWaitLatch = new CountDownLatch(1);
- latch.start(); // will call reset()
- latch.reset(); // should not result in two nodes
+ latch.start(); // will call reset()
+ latch.reset(); // should not result in two nodes
timing.sleepABit();
@@ -70,22 +71,21 @@ public class TestLeaderLatch extends BaseClassForTests
timing.sleepABit();
- Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
- }
- finally
- {
+ Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
+ 1);
+ } finally {
CloseableUtils.closeQuietly(latch);
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testCreateDeleteRace() throws Exception
- {
+ public void testCreateDeleteRace() throws Exception {
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ try {
client.start();
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
@@ -100,43 +100,40 @@ public class TestLeaderLatch extends BaseClassForTests
timing.sleepABit();
- Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
+ Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
+ 0);
- }
- finally
- {
+ } finally {
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testLostConnection() throws Exception
- {
+ public void testLostConnection() throws Exception {
final int PARTICIPANT_QTY = 10;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ try {
client.start();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if ( newState == ConnectionState.LOST )
- {
- countDownLatch.countDown();
- }
- }
- });
+ client.getConnectionStateListenable().addListener(
+ new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework client,
+ ConnectionState newState) {
+ if (newState == ConnectionState.LOST) {
+ countDownLatch.countDown();
+ }
+ }
+ });
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
+ for (int i = 0; i < PARTICIPANT_QTY; ++i) {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
@@ -151,13 +148,12 @@ public class TestLeaderLatch extends BaseClassForTests
Assert.assertEquals(getLeaders(latches).size(), 0);
- server = new TestingServer(server.getPort(), server.getTempDirectory());
- Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
- }
- finally
- {
- for ( LeaderLatch latch : latches )
- {
+ server = new TestingServer(server.getPort(),
+ server.getTempDirectory());
+ Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should
+ // reconnect
+ } finally {
+ for (LeaderLatch latch : latches) {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
@@ -165,21 +161,20 @@ public class TestLeaderLatch extends BaseClassForTests
}
@Test
- public void testCorrectWatching() throws Exception
- {
+ public void testCorrectWatching() throws Exception {
final int PARTICIPANT_QTY = 10;
final int PARTICIPANT_ID = 2;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ try {
client.start();
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
+ for (int i = 0; i < PARTICIPANT_QTY; ++i) {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
@@ -187,20 +182,20 @@ public class TestLeaderLatch extends BaseClassForTests
waitForALeader(latches, timing);
- //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
+ // we need to close a Participant that doesn't be actual leader
+ // (first Participant) nor the last
latches.get(PARTICIPANT_ID).close();
- //As the previous algorithm assumed that if the watched node is deleted gets the leadership
- //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
+ // As the previous algorithm assumed that if the watched node is
+ // deleted gets the leadership
+ // we need to ensure that the PARTICIPANT_ID-1 is not getting
+ // (wrongly) elected as leader.
Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
- }
- finally
- {
- //removes the already closed participant
+ } finally {
+ // removes the already closed participant
latches.remove(PARTICIPANT_ID);
- for ( LeaderLatch latch : latches )
- {
+ for (LeaderLatch latch : latches) {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
@@ -209,37 +204,35 @@ public class TestLeaderLatch extends BaseClassForTests
}
@Test
- public void testWaiting() throws Exception
- {
+ public void testWaiting() throws Exception {
final int PARTICIPANT_QTY = 10;
- ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
- ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(executorService);
+ ExecutorService executorService = Executors
+ .newFixedThreadPool(PARTICIPANT_QTY);
+ ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(
+ executorService);
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ try {
client.start();
final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
- service.submit(new Callable<Void>()
- {
+ for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ service.submit(new Callable<Void>() {
@Override
- public Void call() throws Exception
- {
+ public Void call() throws Exception {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- try
- {
+ try {
latch.start();
- Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
- Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
- Thread.sleep((int)(10 * Math.random()));
- }
- finally
- {
+ Assert.assertTrue(latch.await(timing.forWaiting()
+ .seconds(), TimeUnit.SECONDS));
+ Assert.assertTrue(thereIsALeader.compareAndSet(
+ false, true));
+ Thread.sleep((int) (10 * Math.random()));
+ } finally {
thereIsALeader.set(false);
latch.close();
}
@@ -248,68 +241,58 @@ public class TestLeaderLatch extends BaseClassForTests
});
}
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
+ for (int i = 0; i < PARTICIPANT_QTY; ++i) {
service.take().get();
}
- }
- finally
- {
+ } finally {
executorService.shutdown();
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testBasic() throws Exception
- {
+ public void testBasic() throws Exception {
basic(Mode.START_IMMEDIATELY);
}
@Test
- public void testBasicAlt() throws Exception
- {
+ public void testBasicAlt() throws Exception {
basic(Mode.START_IN_THREADS);
}
@Test
- public void testCallbackSanity() throws Exception
- {
+ public void testCallbackSanity() throws Exception {
final int PARTICIPANT_QTY = 10;
final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors
+ .newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("callbackSanity-%s")
+ .build());
List<LeaderLatch> latches = Lists.newArrayList();
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
+ for (int i = 0; i < PARTICIPANT_QTY; ++i) {
final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- latch.addListener(new LeaderLatchListener()
- {
+ latch.addListener(new LeaderLatchListener() {
boolean beenLeader = false;
@Override
- public void isLeader()
- {
- if ( !beenLeader )
- {
+ public void isLeader() {
+ if (!beenLeader) {
masterCounter.incrementAndGet();
beenLeader = true;
- try
- {
+ try {
latch.reset();
- }
- catch ( Exception e )
- {
+ } catch (Exception e) {
throw Throwables.propagate(e);
}
- }
- else
- {
+ } else {
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
@@ -317,20 +300,17 @@ public class TestLeaderLatch extends BaseClassForTests
}
@Override
- public void notLeader()
- {
+ public void notLeader() {
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
- try
- {
+ try {
client.start();
- for ( LeaderLatch latch : latches )
- {
+ for (LeaderLatch latch : latches) {
latch.start();
}
@@ -338,17 +318,12 @@ public class TestLeaderLatch extends BaseClassForTests
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
- for ( LeaderLatch latch : latches )
- {
+ for (LeaderLatch latch : latches) {
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
- }
- finally
- {
- for ( LeaderLatch latch : latches )
- {
- if ( latch.getState() != LeaderLatch.State.CLOSED )
- {
+ } finally {
+ for (LeaderLatch latch : latches) {
+ if (latch.getState() != LeaderLatch.State.CLOSED) {
CloseableUtils.closeQuietly(latch);
}
}
@@ -357,8 +332,7 @@ public class TestLeaderLatch extends BaseClassForTests
}
@Test
- public void testCallbackNotifyLeader() throws Exception
- {
+ public void testCallbackNotifyLeader() throws Exception {
final int PARTICIPANT_QTY = 10;
final int SILENT_QTY = 3;
@@ -367,37 +341,35 @@ public class TestLeaderLatch extends BaseClassForTests
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors
+ .newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("callbackNotifyLeader-%s").build());
List<LeaderLatch> latches = Lists.newArrayList();
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
- LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
+ for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT
+ : LeaderLatch.CloseMode.NOTIFY_LEADER;
- final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
- latch.addListener(new LeaderLatchListener()
- {
+ final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "",
+ closeMode);
+ latch.addListener(new LeaderLatchListener() {
boolean beenLeader = false;
@Override
- public void isLeader()
- {
- if ( !beenLeader )
- {
+ public void isLeader() {
+ if (!beenLeader) {
masterCounter.incrementAndGet();
beenLeader = true;
- try
- {
+ try {
latch.reset();
- }
- catch ( Exception e )
- {
+ } catch (Exception e) {
throw Throwables.propagate(e);
}
- }
- else
- {
+ } else {
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
@@ -405,38 +377,31 @@ public class TestLeaderLatch extends BaseClassForTests
}
@Override
- public void notLeader()
- {
+ public void notLeader() {
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
- try
- {
+ try {
client.start();
- for ( LeaderLatch latch : latches )
- {
+ for (LeaderLatch latch : latches) {
latch.start();
}
timesSquare.await();
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
- Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
- for ( LeaderLatch latch : latches )
- {
+ Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2
+ - SILENT_QTY);
+ for (LeaderLatch latch : latches) {
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
- }
- finally
- {
- for ( LeaderLatch latch : latches )
- {
- if ( latch.getState() != LeaderLatch.State.CLOSED )
- {
+ } finally {
+ for (LeaderLatch latch : latches) {
+ if (latch.getState() != LeaderLatch.State.CLOSED) {
CloseableUtils.closeQuietly(latch);
}
}
@@ -445,47 +410,42 @@ public class TestLeaderLatch extends BaseClassForTests
}
@Test
- public void testCallbackDontNotify() throws Exception
- {
+ public void testCallbackDontNotify() throws Exception {
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
- final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+ final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME,
+ "", LeaderLatch.CloseMode.NOTIFY_LEADER);
- leader.addListener(new LeaderLatchListener()
- {
+ leader.addListener(new LeaderLatchListener() {
@Override
- public void isLeader()
- {
+ public void isLeader() {
}
@Override
- public void notLeader()
- {
+ public void notLeader() {
masterCounter.incrementAndGet();
}
});
- notifiedLeader.addListener(new LeaderLatchListener()
- {
+ notifiedLeader.addListener(new LeaderLatchListener() {
@Override
- public void isLeader()
- {
+ public void isLeader() {
}
@Override
- public void notLeader()
- {
+ public void notLeader() {
notLeaderCounter.incrementAndGet();
}
});
- try
- {
+ try {
client.start();
leader.start();
@@ -504,63 +464,116 @@ public class TestLeaderLatch extends BaseClassForTests
leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
- Assert.assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
+ Assert.assertEquals(notifiedLeader.getState(),
+ LeaderLatch.State.CLOSED);
Assert.assertEquals(masterCounter.get(), 1);
Assert.assertEquals(notLeaderCounter.get(), 0);
- }
- finally
- {
- if ( leader.getState() != LeaderLatch.State.CLOSED )
- {
+ } finally {
+ if (leader.getState() != LeaderLatch.State.CLOSED) {
CloseableUtils.closeQuietly(leader);
}
- if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
- {
+ if (notifiedLeader.getState() != LeaderLatch.State.CLOSED) {
CloseableUtils.closeQuietly(notifiedLeader);
}
CloseableUtils.closeQuietly(client);
}
}
- private enum Mode
- {
- START_IMMEDIATELY,
- START_IN_THREADS
+ @Test
+ public void testNoServerAtStart() {
+ CloseableUtils.closeQuietly(server);
+
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+
+ client.start();
+
+ final CountDownLatch sessionLostCount = new CountDownLatch(1);
+
+ // Need to ensure that we've actually lost the connection completely
+ // before starting. Otherwise it's possible that the server will start
+ // during retries of the inital commands to create the latch zNodes
+ client.getConnectionStateListenable().addListener(
+ new ConnectionStateListener() {
+
+ @Override
+ public void stateChanged(CuratorFramework client,
+ ConnectionState newState) {
+ if (newState == ConnectionState.LOST) {
+ sessionLostCount.countDown();
+ }
+ }
+ });
+
+ final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
+ final CountDownLatch leaderCounter = new CountDownLatch(1);
+ leader.addListener(new LeaderLatchListener() {
+
+ @Override
+ public void isLeader() {
+ leaderCounter.countDown();
+ }
+
+ @Override
+ public void notLeader() {
+ }
+
+ });
+
+ try {
+ leader.start();
+
+ timing.awaitLatch(sessionLostCount);
+
+ // Start the new server
+ server = new TestingServer(server.getPort());
+
+ timing.awaitLatch(leaderCounter);
+
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ } finally {
+ CloseableUtils.closeQuietly(leader);
+ CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(server);
+ }
+
+ }
+
+ private enum Mode {
+ START_IMMEDIATELY, START_IN_THREADS
}
- private void basic(Mode mode) throws Exception
- {
- final int PARTICIPANT_QTY = 1;//0;
+ private void basic(Mode mode) throws Exception {
+ final int PARTICIPANT_QTY = 1;// 0;
List<LeaderLatch> latches = Lists.newArrayList();
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
+ timing.connection(), new RetryOneTime(1));
+ try {
client.start();
- for ( int i = 0; i < PARTICIPANT_QTY; ++i )
- {
+ for (int i = 0; i < PARTICIPANT_QTY; ++i) {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- if ( mode == Mode.START_IMMEDIATELY )
- {
+ if (mode == Mode.START_IMMEDIATELY) {
latch.start();
}
latches.add(latch);
}
- if ( mode == Mode.START_IN_THREADS )
- {
- ExecutorService service = Executors.newFixedThreadPool(latches.size());
- for ( final LeaderLatch latch : latches )
- {
- service.submit(new Callable<Object>()
- {
+ if (mode == Mode.START_IN_THREADS) {
+ ExecutorService service = Executors.newFixedThreadPool(latches
+ .size());
+ for (final LeaderLatch latch : latches) {
+ service.submit(new Callable<Object>() {
@Override
- public Object call() throws Exception
- {
- Thread.sleep((int)(100 * Math.random()));
+ public Object call() throws Exception {
+ Thread.sleep((int) (100 * Math.random()));
latch.start();
return null;
}
@@ -569,36 +582,38 @@ public class TestLeaderLatch extends BaseClassForTests
service.shutdown();
}
- while ( latches.size() > 0 )
- {
+ while (latches.size() > 0) {
List<LeaderLatch> leaders = waitForALeader(latches, timing);
- Assert.assertEquals(leaders.size(), 1); // there can only be one leader
+ Assert.assertEquals(leaders.size(), 1); // there can only be one
+ // leader
LeaderLatch theLeader = leaders.get(0);
- if ( mode == Mode.START_IMMEDIATELY )
- {
- Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
+ if (mode == Mode.START_IMMEDIATELY) {
+ Assert.assertEquals(latches.indexOf(theLeader), 0); // assert
+ // ordering
+ // -
+ // leadership
+ // should
+ // advance
+ // in
+ // start
+ // order
}
theLeader.close();
latches.remove(theLeader);
}
- }
- finally
- {
- for ( LeaderLatch latch : latches )
- {
+ } finally {
+ for (LeaderLatch latch : latches) {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
}
}
- private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
- {
- for ( int i = 0; i < MAX_LOOPS; ++i )
- {
+ private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches,
+ Timing timing) throws InterruptedException {
+ for (int i = 0; i < MAX_LOOPS; ++i) {
List<LeaderLatch> leaders = getLeaders(latches);
- if ( leaders.size() != 0 )
- {
+ if (leaders.size() != 0) {
return leaders;
}
timing.sleepABit();
@@ -606,13 +621,10 @@ public class TestLeaderLatch extends BaseClassForTests
return Lists.newArrayList();
}
- private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)
- {
+ private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches) {
List<LeaderLatch> leaders = Lists.newArrayList();
- for ( LeaderLatch latch : latches )
- {
- if ( latch.hasLeadership() )
- {
+ for (LeaderLatch latch : latches) {
+ if (latch.hasLeadership()) {
leaders.add(latch);
}
}
[16/18] git commit: minor refactor
Posted by ra...@apache.org.
minor refactor
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/03e736c6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/03e736c6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/03e736c6
Branch: refs/heads/master
Commit: 03e736c66c353eb05ae7257553e53d8c2983ce40
Parents: 5954e66
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 17:04:00 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 17:04:00 2014 -0500
----------------------------------------------------------------------
.../curator/framework/state/ConnectionStateManager.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/03e736c6/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index fb312dc..2a0cdd1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -193,16 +193,16 @@ public class ConnectionStateManager implements Closeable
public synchronized boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
{
- boolean hasMaxWait = (units != null);
long startTime = System.currentTimeMillis();
+ boolean hasMaxWait = (units != null);
+ long maxWaitTimeMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0;
+
while ( !isConnected() )
{
- long maxWaitTimeMS = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0;
-
if ( hasMaxWait )
{
- long waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+ long waitTime = maxWaitTimeMs - (System.currentTimeMillis() - startTime);
if ( waitTime <= 0 )
{
return isConnected();
[03/18] git commit: CURATOR-110 - Modified the enum to have an
abstract isConnected() method that can be overridden by each enum instance.
Posted by ra...@apache.org.
CURATOR-110 - Modified the enum to have an abstract isConnected() method
that can be overridden by each enum instance.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2c376b9f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2c376b9f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2c376b9f
Branch: refs/heads/master
Commit: 2c376b9f51d6eab4340651abce3284f358bedbc7
Parents: 0b7ae7e
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Wed Jun 4 10:24:31 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Wed Jun 4 10:24:31 2014 +1000
----------------------------------------------------------------------
.../framework/state/ConnectionState.java | 45 ++++++++++++++++----
1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2c376b9f/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 7663e10..2e08b34 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -29,26 +29,50 @@ public enum ConnectionState
* Sent for the first successful connection to the server. NOTE: You will only
* get one of these messages for any CuratorFramework instance.
*/
- CONNECTED,
+ CONNECTED
+ {
+ public boolean isConnected()
+ {
+ return true;
+ }
+ },
/**
* There has been a loss of connection. Leaders, locks, etc. should suspend
* until the connection is re-established. If the connection times-out you will
* receive a {@link #LOST} notice
*/
- SUSPENDED,
+ SUSPENDED
+ {
+ public boolean isConnected()
+ {
+ return false;
+ }
+ },
/**
* A suspended or read-only connection has been re-established
*/
- RECONNECTED,
+ RECONNECTED
+ {
+ public boolean isConnected()
+ {
+ return true;
+ }
+ },
/**
* The connection is confirmed to be lost. Close any locks, leaders, etc. and
* attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
* state after this but you should still consider any locks, etc. as dirty/unstable
*/
- LOST,
+ LOST
+ {
+ public boolean isConnected()
+ {
+ return false;
+ }
+ },
/**
* The connection has gone into read-only mode. This can only happen if you pass true
@@ -57,15 +81,18 @@ public enum ConnectionState
* <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
* The connection will remain in read only mode until another state change is sent.
*/
- READ_ONLY;
+ READ_ONLY
+ {
+ public boolean isConnected()
+ {
+ return true;
+ }
+ };
/**
* Check if this state indicates that Curator has a connection to ZooKeeper
*
* @return True if connected, false otherwise
*/
- public boolean isConnected()
- {
- return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
- }
+ public abstract boolean isConnected();
}
[09/18] git commit: CURATOR-110 - Moved the 'wait until connection
established' logic into the ExecuteAfterConnectionEstablished utility class.
Cleaned up the blockUntilConnected() logic in the CuratorFrameworkImpl
Posted by ra...@apache.org.
CURATOR-110 - Moved the 'wait until connection established' logic into
the ExecuteAfterConnectionEstablished utility class. Cleaned up the
blockUntilConnected() logic in the CuratorFrameworkImpl
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8138ed9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8138ed9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8138ed9
Branch: refs/heads/master
Commit: e8138ed9768e99e98e308845626e36aa55afadb0
Parents: 63d0401
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Mon Jun 16 15:59:56 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Mon Jun 16 15:59:56 2014 +1000
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 9 +---
.../framework/imps/CuratorFrameworkImpl.java | 22 +++-----
.../framework/state/ConnectionStateManager.java | 5 ++
.../ExecuteAfterConnectionEstablished.java | 53 ++++++++++++++++++++
.../framework/recipes/leader/LeaderLatch.java | 44 ++++++----------
5 files changed, 81 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 1df3fa5..13cff30 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -213,14 +213,7 @@ public interface CuratorFramework extends Closeable
* @param watcher the watcher
*/
public void clearWatcherReferences(Watcher watcher);
-
- /**
- * Get the current connection state. The connection state will have a value of 0 until
- * the first connection related event is received.
- * @return The current connection state, or null if it is unknown
- */
- public ConnectionState getCurrentConnectionState();
-
+
/**
* Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
* @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 33e260d..d1de29f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -67,7 +67,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
- private final AtomicReference<ConnectionState> connectionState;
private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
@@ -75,6 +74,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
+ private final Object connectionLock = new Object();
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -151,7 +151,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
- connectionState = new AtomicReference<ConnectionState>(null);
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -174,10 +173,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- connectionState.set(newState);
- synchronized(connectionState)
+ synchronized(connectionLock)
{
- connectionState.notifyAll();
+ connectionLock.notifyAll();
}
}
});
@@ -220,7 +218,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
threadFactory = parent.threadFactory;
backgroundOperations = parent.backgroundOperations;
connectionStateManager = parent.connectionStateManager;
- connectionState = parent.connectionState;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
compressionProvider = parent.compressionProvider;
@@ -890,16 +887,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
);
}
- public ConnectionState getCurrentConnectionState()
- {
- return connectionState.get();
- }
-
@Override
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
{
//Check if we're already connected
- ConnectionState currentConnectionState = connectionState.get();
+ ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
if(currentConnectionState != null && currentConnectionState.isConnected())
{
return true;
@@ -910,9 +902,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
for(;;)
{
- synchronized(connectionState)
+ synchronized(connectionLock)
{
- currentConnectionState = connectionState.get();
+ currentConnectionState = connectionStateManager.getCurrentConnectionState();
if(currentConnectionState != null && currentConnectionState.isConnected())
{
return true;
@@ -930,7 +922,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- connectionState.wait(waitTime);
+ connectionLock.wait(waitTime);
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 42804b8..ba29994 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -188,6 +188,11 @@ public class ConnectionStateManager implements Closeable
return true;
}
+
+ public synchronized ConnectionState getCurrentConnectionState()
+ {
+ return currentConnectionState;
+ }
private void postState(ConnectionState state)
{
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
new file mode 100644
index 0000000..d213d37
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
@@ -0,0 +1,53 @@
+package org.apache.curator.framework.recipes;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
+ *
+ */
+public class ExecuteAfterConnectionEstablished
+{
+ private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
+
+ /**
+ * Spawns a new new background thread that will block until a connection is available and
+ * then execute the 'runAfterConnection' logic
+ * @param name The name of the spawned thread
+ * @param client The curator client
+ * @param runAfterConnection The logic to run
+ */
+ public static void executeAfterConnectionEstablishedInBackground(String name,
+ final CuratorFramework client,
+ final Runnable runAfterConnection)
+ {
+ //Block until connected
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(name);
+ executor.submit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ client.blockUntilConnected();
+ runAfterConnection.run();
+ }
+ catch(Exception e)
+ {
+ log.error("An error occurred blocking until a connection is available", e);
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 21e8cca..13a9f21 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
import org.apache.curator.framework.recipes.locks.LockInternals;
import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -148,14 +149,6 @@ public class LeaderLatch implements Closeable
this.id = Preconditions.checkNotNull(id, "id cannot be null");
this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
-
- private CountDownLatch startLatch;
-
- public LeaderLatch(CuratorFramework client, String latchPath,
- CountDownLatch startLatch) {
- this(client, latchPath);
- this.startLatch = startLatch;
- }
/**
* Add this instance to the leadership election and attempt to acquire leadership.
@@ -166,30 +159,23 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- //Block until connected
- final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
- executor.submit(new Runnable()
- {
-
- @Override
+ ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(),
+ client, new Runnable()
+ {
+ @Override
public void run()
{
- try
- {
- client.blockUntilConnected();
-
- client.getConnectionStateListenable().addListener(listener);
- reset();
- }
- catch(Exception ex)
- {
- log.error("An error occurred checking resetting leadership.", ex);
- } finally {
- //Shutdown the executor
- executor.shutdown();
- }
+ try
+ {
+ client.getConnectionStateListenable().addListener(listener);
+ reset();
+ }
+ catch(Exception ex)
+ {
+ log.error("An error occurred checking resetting leadership.", ex);
+ }
}
- });
+ });
}
/**
[05/18] git commit: give credit to Facebook Swift
Posted by ra...@apache.org.
give credit to Facebook Swift
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/122476ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/122476ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/122476ac
Branch: refs/heads/master
Commit: 122476ac582f2db9dab6d4fc6f599481fc98b4c9
Parents: 70c5254
Author: randgalt <ra...@apache.org>
Authored: Sat Jun 7 15:05:18 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jun 7 15:05:18 2014 -0500
----------------------------------------------------------------------
curator-x-rpc/src/site/confluence/index.confluence | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/122476ac/curator-x-rpc/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/index.confluence b/curator-x-rpc/src/site/confluence/index.confluence
index b3686c3..70077ac 100644
--- a/curator-x-rpc/src/site/confluence/index.confluence
+++ b/curator-x-rpc/src/site/confluence/index.confluence
@@ -43,3 +43,7 @@ See [[Events|events.html]] for details on the Curator RPC event loop and its str
h2. Reference
See [[API Reference Page|reference.html]] for the API reference.
+
+----
+
+Special thanks to the [[Facebook Swift|https://github.com/facebook/swift/]] project which makes writing a Java Thrift server much easier.
[08/18] git commit: CURATOR-110 - Modified the CuratorFramework to
expose a 'blockUntilConnected' method. This allows a race condition between
the start() method and the ConnectionStateListener in the LeaderLatch recipe
to be avoided. Updated the leader
Posted by ra...@apache.org.
CURATOR-110 - Modified the CuratorFramework to expose a 'blockUntilConnected' method. This allows a race condition between the start() method and the ConnectionStateListener in the LeaderLatch recipe to be avoided. Updated the leader latch start() method to block until a connection is available (in a background thread), before it attempts to setup its state. This means that it will work correctly if started before a connection to ZK is available.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/63d0401c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/63d0401c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/63d0401c
Branch: refs/heads/master
Commit: 63d0401c05f6cc122468f55c79c9e1bd4ddb3eb5
Parents: 2c376b9
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Thu Jun 12 14:09:45 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Thu Jun 12 14:09:45 2014 +1000
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 27 +++
.../framework/imps/CuratorFrameworkImpl.java | 69 ++++++
.../framework/imps/TestBlockUntilConnected.java | 217 +++++++++++++++++++
.../framework/recipes/leader/LeaderLatch.java | 38 +++-
.../recipes/leader/TestLeaderLatch.java | 43 ++--
5 files changed, 366 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 2d6e182..1df3fa5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -24,10 +24,13 @@ import org.apache.curator.framework.api.*;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
+
import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
/**
* Zookeeper framework-style client
@@ -210,4 +213,28 @@ public interface CuratorFramework extends Closeable
* @param watcher the watcher
*/
public void clearWatcherReferences(Watcher watcher);
+
+ /**
+ * Get the current connection state. The connection state will have a value of 0 until
+ * the first connection related event is received.
+ * @return The current connection state, or null if it is unknown
+ */
+ public ConnectionState getCurrentConnectionState();
+
+ /**
+ * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
+ * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
+ * @param units The time units for the maximum wait time.
+ * @return True if connection has been established, false otherwise.
+ * @throws InterruptedException If interrupted while waiting
+ */
+ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
+
+ /**
+ * Block until a connection to ZooKeeper is available. This method will not return until a
+ * connection is available or it is interrupted, in which case an InterruptedException will
+ * be thrown
+ * @throws InterruptedException If interrupted while waiting
+ */
+ public void blockUntilConnected() throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c4b1349..33e260d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.imps;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
@@ -43,6 +44,7 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -65,6 +67,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
+ private final AtomicReference<ConnectionState> connectionState;
private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
@@ -148,6 +151,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
+ connectionState = new AtomicReference<ConnectionState>(null);
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -162,6 +166,21 @@ public class CuratorFrameworkImpl implements CuratorFramework
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
+
+ //Add callback handler to determine connection state transitions
+ getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ connectionState.set(newState);
+ synchronized(connectionState)
+ {
+ connectionState.notifyAll();
+ }
+ }
+ });
}
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -201,6 +220,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
threadFactory = parent.threadFactory;
backgroundOperations = parent.backgroundOperations;
connectionStateManager = parent.connectionStateManager;
+ connectionState = parent.connectionState;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
compressionProvider = parent.compressionProvider;
@@ -869,4 +889,53 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
);
}
+
+ public ConnectionState getCurrentConnectionState()
+ {
+ return connectionState.get();
+ }
+
+ @Override
+ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+ {
+ //Check if we're already connected
+ ConnectionState currentConnectionState = connectionState.get();
+ if(currentConnectionState != null && currentConnectionState.isConnected())
+ {
+ return true;
+ }
+
+ long startTime = System.currentTimeMillis();
+ long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
+
+ for(;;)
+ {
+ synchronized(connectionState)
+ {
+ currentConnectionState = connectionState.get();
+ if(currentConnectionState != null && currentConnectionState.isConnected())
+ {
+ return true;
+ }
+
+ long waitTime = 0;
+ if(maxWaitTime > 0)
+ {
+ waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+
+ //Timeout
+ if(waitTime <= 0)
+ {
+ return false;
+ }
+ }
+
+ connectionState.wait(waitTime);
+ }
+ }
+ }
+
+ public void blockUntilConnected() throws InterruptedException {
+ blockUntilConnected(0, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
new file mode 100644
index 0000000..996e5fc
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -0,0 +1,217 @@
+package org.apache.curator.framework.imps;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestBlockUntilConnected extends BaseClassForTests
+{
+ /**
+ * Test the case where we're already connected
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyConnected()
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState.isConnected())
+ {
+ connectedLatch.countDown();
+ }
+ }
+ });
+
+ client.start();
+
+ Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+ Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+ }
+ catch(InterruptedException e)
+ {
+ Assert.fail("Unexpected interruption");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected and never have been
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyNeverConnected()
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ client.start();
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ }
+ catch(InterruptedException e)
+ {
+ Assert.fail("Unexpected interruption");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected, but have been previously
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState == ConnectionState.LOST)
+ {
+ lostLatch.countDown();
+ }
+ }
+ });
+
+ try
+ {
+ client.start();
+
+ //Block until we're connected
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
+
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ //Wait until we hit the lost state
+ Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ }
+ catch(Exception e)
+ {
+ Assert.fail("Unexpected exception " + e);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected and time out before a
+ * connection becomes available.
+ */
+ @Test
+ public void testBlockUntilConnectedConnectTimeout()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ client.start();
+ Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
+ "Connected");
+ }
+ catch(InterruptedException e)
+ {
+ Assert.fail("Unexpected interruption");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected and the thread gets interrupted
+ * prior to a connection becoming available
+ */
+ @Test
+ public void testBlockUntilConnectedInterrupt()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ final CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ client.start();
+
+ final Thread threadToInterrupt = Thread.currentThread();
+
+ Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+
+ @Override
+ public void run() {
+ threadToInterrupt.interrupt();
+ }
+ }, 3000);
+
+ client.blockUntilConnected(5, TimeUnit.SECONDS);
+ Assert.fail("Expected interruption did not occur");
+ }
+ catch(InterruptedException e)
+ {
+ //This is expected
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index c1157c9..21e8cca 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -31,6 +32,7 @@ import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -38,6 +40,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
@@ -45,6 +48,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -144,6 +148,14 @@ public class LeaderLatch implements Closeable
this.id = Preconditions.checkNotNull(id, "id cannot be null");
this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
+
+ private CountDownLatch startLatch;
+
+ public LeaderLatch(CuratorFramework client, String latchPath,
+ CountDownLatch startLatch) {
+ this(client, latchPath);
+ this.startLatch = startLatch;
+ }
/**
* Add this instance to the leadership election and attempt to acquire leadership.
@@ -154,8 +166,30 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- client.getConnectionStateListenable().addListener(listener);
- reset();
+ //Block until connected
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
+ executor.submit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ client.blockUntilConnected();
+
+ client.getConnectionStateListenable().addListener(listener);
+ reset();
+ }
+ catch(Exception ex)
+ {
+ log.error("An error occurred checking resetting leadership.", ex);
+ } finally {
+ //Shutdown the executor
+ executor.shutdown();
+ }
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index d3b00bc..35d8809 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,10 +22,12 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.TestingServer;
@@ -33,6 +35,7 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
@@ -42,6 +45,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class TestLeaderLatch extends BaseClassForTests
@@ -531,57 +535,45 @@ public class TestLeaderLatch extends BaseClassForTests
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(
server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
+ timing.connection(), new RetryNTimes(5, 1000));
client.start();
- final CountDownLatch sessionLostCount = new CountDownLatch(1);
-
- // Need to ensure that we've actually lost the connection completely
- // before starting. Otherwise it's possible that the server will start
- // during retries of the inital commands to create the latch zNodes
- client.getConnectionStateListenable().addListener(
- new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client,
- ConnectionState newState)
- {
- if (newState == ConnectionState.LOST)
- {
- sessionLostCount.countDown();
- }
- }
- });
-
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final CountDownLatch leaderCounter = new CountDownLatch(1);
+ final AtomicInteger leaderCount = new AtomicInteger(0);
+ final AtomicInteger notLeaderCount = new AtomicInteger(0);
leader.addListener(new LeaderLatchListener()
{
@Override
public void isLeader()
{
leaderCounter.countDown();
+ leaderCount.incrementAndGet();
}
@Override
public void notLeader()
{
+ notLeaderCount.incrementAndGet();
}
});
try
{
- leader.start();
-
- timing.awaitLatch(sessionLostCount);
+ leader.start();
+
+ //Wait for a while before starting the test server
+ Thread.sleep(5000);
// Start the new server
server = new TestingServer(server.getPort());
- timing.awaitLatch(leaderCounter);
-
+ Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
+
+ Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
+ Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
}
catch (Exception e)
{
@@ -593,7 +585,6 @@ public class TestLeaderLatch extends BaseClassForTests
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
-
}
private enum Mode
[14/18] git commit: Now that start() uses AfterConnectionEstablished,
no longer correct/needed to handle CONNECTED in state change
Posted by ra...@apache.org.
Now that start() uses AfterConnectionEstablished, no longer correct/needed to handle CONNECTED in state change
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fab79a45
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fab79a45
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fab79a45
Branch: refs/heads/master
Commit: fab79a4577d3af80260d25128cc56f26c7011dbe
Parents: 04cefb4
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:41:13 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:41:13 2014 -0500
----------------------------------------------------------------------
.../framework/recipes/leader/LeaderLatch.java | 100 ++++++++++---------
1 file changed, 51 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fab79a45/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index dce3f5e..9d70645 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -44,7 +44,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -156,9 +155,7 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- AfterConnectionEstablished.execute
- (
- client, new Runnable()
+ AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
@@ -173,8 +170,7 @@ public class LeaderLatch implements Closeable
log.error("An error occurred checking resetting leadership.", e);
}
}
- }
- );
+ });
}
/**
@@ -196,7 +192,6 @@ public class LeaderLatch implements Closeable
* instances must eventually be closed.
*
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
- *
* @throws IOException errors
*/
public void close(CloseMode closeMode) throws IOException
@@ -218,28 +213,28 @@ public class LeaderLatch implements Closeable
switch ( closeMode )
{
- case NOTIFY_LEADER:
- {
- setLeadership(false);
- listeners.clear();
- break;
- }
+ case NOTIFY_LEADER:
+ {
+ setLeadership(false);
+ listeners.clear();
+ break;
+ }
- default:
- {
- listeners.clear();
- setLeadership(false);
- break;
- }
+ default:
+ {
+ listeners.clear();
+ setLeadership(false);
+ break;
+ }
}
}
}
/**
* Attaches a listener to this LeaderLatch
- * <p>
+ * <p/>
* Attaching the same listener multiple times is a noop from the second time on.
- * <p>
+ * <p/>
* All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
* executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
@@ -253,15 +248,15 @@ public class LeaderLatch implements Closeable
/**
* Attaches a listener to this LeaderLatch
- * <p>
+ * <p/>
* Attaching the same listener multiple times is a noop from the second time on.
- * <p>
+ * <p/>
* All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
* executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
*
* @param listener the listener to attach
- * @param executor An executor to run the methods for the listener on.
+ * @param executor An executor to run the methods for the listener on.
*/
public void addListener(LeaderLatchListener listener, Executor executor)
{
@@ -282,7 +277,7 @@ public class LeaderLatch implements Closeable
* <p>Causes the current thread to wait until this instance acquires leadership
* unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
* <p>If this instance already is the leader then this method returns immediately.</p>
- *
+ * <p/>
* <p>Otherwise the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of three things happen:</p>
@@ -324,10 +319,10 @@ public class LeaderLatch implements Closeable
* <p>Causes the current thread to wait until this instance acquires leadership
* unless the thread is {@linkplain Thread#interrupt interrupted},
* the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
- *
+ * <p/>
* <p>If this instance already is the leader then this method returns immediately
* with the value {@code true}.</p>
- *
+ * <p/>
* <p>Otherwise the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of four things happen:</p>
@@ -338,7 +333,7 @@ public class LeaderLatch implements Closeable
* <li>The specified waiting time elapses.</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- *
+ * <p/>
* <p>If the current thread:</p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
@@ -346,7 +341,7 @@ public class LeaderLatch implements Closeable
* </ul>
* <p>then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.</p>
- *
+ * <p/>
* <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
* then the value {@code false} is returned. If the time is less than or equal to zero, the method
* will not wait at all.</p>
@@ -354,7 +349,7 @@ public class LeaderLatch implements Closeable
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false}
- * if the waiting time elapsed before the count reached zero or the instances was closed
+ * if the waiting time elapsed before the count reached zero or the instances was closed
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
@@ -561,22 +556,35 @@ public class LeaderLatch implements Closeable
private void handleStateChange(ConnectionState newState)
{
- if ( newState == ConnectionState.RECONNECTED )
+ switch ( newState )
{
- try
+ default:
{
- reset();
+ // NOP
+ break;
+ }
+
+ case RECONNECTED:
+ {
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not reset leader latch", e);
+ setLeadership(false);
+ }
+ break;
}
- catch (Exception e)
+
+ case SUSPENDED:
+ case LOST:
{
- log.error("Could not reset leader latch", e);
setLeadership(false);
+ break;
}
}
- else
- {
- setLeadership(false);
- }
}
private synchronized void setLeadership(boolean newValue)
@@ -585,9 +593,7 @@ public class LeaderLatch implements Closeable
if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
- listeners.forEach
- (
- new Function<LeaderLatchListener, Void>()
+ listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
@@ -595,14 +601,11 @@ public class LeaderLatch implements Closeable
listener.notLeader();
return null;
}
- }
- );
+ });
}
else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
- listeners.forEach
- (
- new Function<LeaderLatchListener, Void>()
+ listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
@@ -610,8 +613,7 @@ public class LeaderLatch implements Closeable
input.isLeader();
return null;
}
- }
- );
+ });
}
notifyAll();
[13/18] git commit: Moved the connection blocking code into
ConnectionManager. It's cleaner and doesn't require a connection state
listener
Posted by ra...@apache.org.
Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require a connection state listener
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/04cefb47
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/04cefb47
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/04cefb47
Branch: refs/heads/master
Commit: 04cefb47f18c9d4bd3a0eb897563dd5abb7c89c8
Parents: 1c94c7e
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:35:54 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:35:54 2014 -0500
----------------------------------------------------------------------
.../framework/imps/CuratorFrameworkImpl.java | 289 +++++++-----------
.../framework/state/ConnectionStateManager.java | 38 ++-
.../framework/imps/TestBlockUntilConnected.java | 304 +++++++++----------
.../recipes/AfterConnectionEstablished.java | 14 +-
.../framework/recipes/leader/LeaderLatch.java | 21 +-
.../recipes/leader/TestLeaderLatch.java | 46 +--
6 files changed, 336 insertions(+), 376 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 14473d8..23a3248 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
@@ -44,7 +44,6 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -59,40 +58,40 @@ import java.util.concurrent.atomic.AtomicReference;
public class CuratorFrameworkImpl implements CuratorFramework
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorZookeeperClient client;
- private final ListenerContainer<CuratorListener> listeners;
- private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
- private final ThreadFactory threadFactory;
- private final BlockingQueue<OperationAndData<?>> backgroundOperations;
- private final NamespaceImpl namespace;
- private final ConnectionStateManager connectionStateManager;
- private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
- private final byte[] defaultData;
- private final FailedDeleteManager failedDeleteManager;
- private final CompressionProvider compressionProvider;
- private final ACLProvider aclProvider;
- private final NamespaceFacadeCache namespaceFacadeCache;
- private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
- private final Object connectionLock = new Object();
-
- private volatile ExecutorService executorService;
- private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
-
- private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorZookeeperClient client;
+ private final ListenerContainer<CuratorListener> listeners;
+ private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
+ private final ThreadFactory threadFactory;
+ private final BlockingQueue<OperationAndData<?>> backgroundOperations;
+ private final NamespaceImpl namespace;
+ private final ConnectionStateManager connectionStateManager;
+ private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
+ private final byte[] defaultData;
+ private final FailedDeleteManager failedDeleteManager;
+ private final CompressionProvider compressionProvider;
+ private final ACLProvider aclProvider;
+ private final NamespaceFacadeCache namespaceFacadeCache;
+ private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
+
+ private volatile ExecutorService executorService;
+ private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
+
+ private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
interface DebugBackgroundListener
{
- void listen(OperationAndData<?> data);
+ void listen(OperationAndData<?> data);
}
- volatile DebugBackgroundListener debugListener = null;
- private final AtomicReference<CuratorFrameworkState> state;
+ volatile DebugBackgroundListener debugListener = null;
+
+ private final AtomicReference<CuratorFrameworkState> state;
private static class AuthInfo
{
- final String scheme;
- final byte[] auth;
+ final String scheme;
+ final byte[] auth;
private AuthInfo(String scheme, byte[] auth)
{
@@ -113,37 +112,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
- this.client = new CuratorZookeeperClient
- (
- localZookeeperFactory,
- builder.getEnsembleProvider(),
- builder.getSessionTimeoutMs(),
- builder.getConnectionTimeoutMs(),
- new Watcher()
+ this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent watchedEvent)
{
- @Override
- public void process(WatchedEvent watchedEvent)
- {
- CuratorEvent event = new CuratorEventImpl
- (
- CuratorFrameworkImpl.this,
- CuratorEventType.WATCHED,
- watchedEvent.getState().getIntValue(),
- unfixForNamespace(watchedEvent.getPath()),
- null,
- null,
- null,
- null,
- null,
- watchedEvent,
- null
- );
- processEvent(event);
- }
- },
- builder.getRetryPolicy(),
- builder.canBeReadOnly()
- );
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
+ processEvent(event);
+ }
+ }, builder.getRetryPolicy(), builder.canBeReadOnly());
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -155,7 +132,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
- byte[] builderDefaultData = builder.getDefaultData();
+ byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
if ( builder.getAuthScheme() != null )
@@ -165,23 +142,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
-
- //Add callback handler to determine connection state transitions
- getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState.isConnected())
- {
- synchronized(connectionLock)
- {
- connectionLock.notifyAll();
- }
- }
- }
- });
}
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -253,7 +213,19 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
- public void start()
+ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+ {
+ return connectionStateManager.blockUntilConnected(maxWaitTime, units);
+ }
+
+ @Override
+ public void blockUntilConnected() throws InterruptedException
+ {
+ blockUntilConnected(0, null);
+ }
+
+ @Override
+ public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
@@ -266,7 +238,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
try
{
connectionStateManager.start(); // ordering dependency - must be called before client.start()
-
+
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
@@ -278,16 +250,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
};
-
+
this.getConnectionStateListenable().addListener(listener);
client.start();
executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops
- executorService.submit
- (
- new Callable<Object>()
+ executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
@@ -295,8 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
backgroundOperationsLoop();
return null;
}
- }
- );
+ });
}
catch ( Exception e )
{
@@ -305,31 +274,28 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
- public void close()
+ public void close()
{
log.debug("Closing");
if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
{
- listeners.forEach
- (
- new Function<CuratorListener, Void>()
+ listeners.forEach(new Function<CuratorListener, Void>()
+ {
+ @Override
+ public Void apply(CuratorListener listener)
{
- @Override
- public Void apply(CuratorListener listener)
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
+ try
+ {
+ listener.eventReceived(CuratorFrameworkImpl.this, event);
+ }
+ catch ( Exception e )
{
- CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
- try
- {
- listener.eventReceived(CuratorFrameworkImpl.this, event);
- }
- catch ( Exception e )
- {
- log.error("Exception while sending Closing event", e);
- }
- return null;
+ log.error("Exception while sending Closing event", e);
}
+ return null;
}
- );
+ });
listeners.clear();
unhandledErrorListeners.clear();
@@ -515,14 +481,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
- boolean isInitialExecution = (event == null);
+ boolean isInitialExecution = (event == null);
if ( isInitialExecution )
{
performBackgroundOperation(operationAndData);
return;
}
- boolean doQueueOperation = false;
+ boolean doQueueOperation = false;
do
{
if ( RetryLoop.shouldRetry(event.getResultCode()) )
@@ -538,7 +504,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
processEvent(event);
- } while ( false );
+ }
+ while ( false );
if ( doQueueOperation )
{
@@ -560,7 +527,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException) )
{
- if ( e instanceof KeeperException.ConnectionLossException || e instanceof CuratorConnectionLossException ) {
+ if ( e instanceof KeeperException.ConnectionLossException )
+ {
if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false) )
{
log.error(reason, e);
@@ -576,27 +544,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- final String localReason = reason;
- unhandledErrorListeners.forEach
- (
- new Function<UnhandledErrorListener, Void>()
+ final String localReason = reason;
+ unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
+ {
+ @Override
+ public Void apply(UnhandledErrorListener listener)
{
- @Override
- public Void apply(UnhandledErrorListener listener)
- {
- listener.unhandledError(localReason, e);
- return null;
- }
+ listener.unhandledError(localReason, e);
+ return null;
}
- );
+ });
}
- String unfixForNamespace(String path)
+ String unfixForNamespace(String path)
{
return namespace.unfixForNamespace(path);
}
- String fixForNamespace(String path)
+ String fixForNamespace(String path)
{
return namespace.fixForNamespace(path);
}
@@ -723,8 +688,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
sendToBackgroundCallback(operationAndData, event);
}
- KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
- Exception e = null;
+ KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
+ Exception e = null;
try
{
e = (code != null) ? KeeperException.create(code) : null;
@@ -755,7 +720,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- private<DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
+ private <DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
{
do
{
@@ -788,14 +753,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
logError("Background exception was not retry-able or retry gave up", e);
- } while ( false );
+ }
+ while ( false );
}
private void backgroundOperationsLoop()
{
while ( !Thread.interrupted() )
{
- OperationAndData<?> operationAndData;
+ OperationAndData<?> operationAndData;
try
{
operationAndData = backgroundOperations.take();
@@ -850,7 +816,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
else
{
- logError("Background retry gave up", e);
+ logError("Background retry gave up", e);
}
}
else
@@ -867,70 +833,23 @@ public class CuratorFrameworkImpl implements CuratorFramework
validateConnection(curatorEvent.getWatchedEvent().getState());
}
- listeners.forEach
- (
- new Function<CuratorListener, Void>()
+ listeners.forEach(new Function<CuratorListener, Void>()
+ {
+ @Override
+ public Void apply(CuratorListener listener)
{
- @Override
- public Void apply(CuratorListener listener)
+ try
{
- try
- {
- TimeTrace trace = client.startTracer("EventListener");
- listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
- trace.commit();
- }
- catch ( Exception e )
- {
- logError("Event listener threw exception", e);
- }
- return null;
+ TimeTrace trace = client.startTracer("EventListener");
+ listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
+ trace.commit();
}
+ catch ( Exception e )
+ {
+ logError("Event listener threw exception", e);
+ }
+ return null;
}
- );
- }
-
- @Override
- public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
- {
- //Check if we're already connected
- ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
- if(currentConnectionState != null && currentConnectionState.isConnected())
- {
- return true;
- }
-
- long startTime = System.currentTimeMillis();
- long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
-
- for(;;)
- {
- synchronized(connectionLock)
- {
- currentConnectionState = connectionStateManager.getCurrentConnectionState();
- if(currentConnectionState != null && currentConnectionState.isConnected())
- {
- return true;
- }
-
- long waitTime = 0;
- if(maxWaitTime > 0)
- {
- waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
-
- //Timeout
- if(waitTime <= 0)
- {
- return false;
- }
- }
-
- connectionLock.wait(waitTime);
- }
- }
- }
-
- public void blockUntilConnected() throws InterruptedException {
- blockUntilConnected(0, TimeUnit.SECONDS);
+ });
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index ba29994..fb312dc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -153,6 +154,7 @@ public class ConnectionStateManager implements Closeable
currentConnectionState = ConnectionState.SUSPENDED;
postState(ConnectionState.SUSPENDED);
+
return true;
}
@@ -188,15 +190,45 @@ public class ConnectionStateManager implements Closeable
return true;
}
-
- public synchronized ConnectionState getCurrentConnectionState()
+
+ public synchronized boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+ {
+ boolean hasMaxWait = (units != null);
+ long startTime = System.currentTimeMillis();
+
+ while ( !isConnected() )
+ {
+ long maxWaitTimeMS = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0;
+
+ if ( hasMaxWait )
+ {
+ long waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+ if ( waitTime <= 0 )
+ {
+ return isConnected();
+ }
+
+ wait(waitTime);
+ }
+ else
+ {
+ wait();
+ }
+ }
+ return isConnected();
+ }
+
+ public synchronized boolean isConnected()
{
- return currentConnectionState;
+ return (currentConnectionState != null) && currentConnectionState.isConnected();
}
private void postState(ConnectionState state)
{
log.info("State change: " + state);
+
+ notifyAll();
+
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index 8dfb7d8..f649afb 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -16,12 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.curator.framework.imps;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+package org.apache.curator.framework.imps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -34,202 +30,206 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class TestBlockUntilConnected extends BaseClassForTests
{
- /**
- * Test the case where we're already connected
- */
- @Test
- public void testBlockUntilConnectedCurrentlyConnected()
- {
- Timing timing = new Timing();
+ /**
+ * Test the case where we're already connected
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyConnected() throws Exception
+ {
+ Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- final CountDownLatch connectedLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState.isConnected())
- {
- connectedLatch.countDown();
- }
- }
- });
-
- client.start();
-
- Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
- Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState.isConnected() )
+ {
+ connectedLatch.countDown();
+ }
+ }
+ });
+
+ client.start();
+
+ Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+ Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- Assert.fail("Unexpected interruption");
+ Assert.fail("Unexpected interruption");
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected and never have been
- */
- @Test
- public void testBlockUntilConnectedCurrentlyNeverConnected()
- {
+ }
+
+ /**
+ * Test the case where we are not currently connected and never have been
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyNeverConnected()
+ {
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- client.start();
- Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ client.start();
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- Assert.fail("Unexpected interruption");
+ Assert.fail("Unexpected interruption");
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected, but have been previously
- */
- @Test
- public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
- {
- Timing timing = new Timing();
+ }
+
+ /**
+ * Test the case where we are not currently connected, but have been previously
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+ {
+ Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
final CountDownLatch lostLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState == ConnectionState.LOST)
- {
- lostLatch.countDown();
- }
- }
- });
-
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ }
+ });
+
try
{
- client.start();
-
- //Block until we're connected
- Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
-
- //Kill the server
- CloseableUtils.closeQuietly(server);
-
- //Wait until we hit the lost state
- Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
-
- server = new TestingServer(server.getPort(), server.getTempDirectory());
-
- Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ client.start();
+
+ //Block until we're connected
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
+
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ //Wait until we hit the lost state
+ Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
}
- catch(Exception e)
+ catch ( Exception e )
{
- Assert.fail("Unexpected exception " + e);
+ Assert.fail("Unexpected exception " + e);
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected and time out before a
- * connection becomes available.
- */
- @Test
- public void testBlockUntilConnectedConnectTimeout()
- {
- //Kill the server
- CloseableUtils.closeQuietly(server);
-
+ }
+
+ /**
+ * Test the case where we are not currently connected and time out before a
+ * connection becomes available.
+ */
+ @Test
+ public void testBlockUntilConnectedConnectTimeout()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- client.start();
- Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
- "Connected");
+ client.start();
+ Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), "Connected");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- Assert.fail("Unexpected interruption");
+ Assert.fail("Unexpected interruption");
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected and the thread gets interrupted
- * prior to a connection becoming available
- */
- @Test
- public void testBlockUntilConnectedInterrupt()
- {
- //Kill the server
- CloseableUtils.closeQuietly(server);
-
+ }
+
+ /**
+ * Test the case where we are not currently connected and the thread gets interrupted
+ * prior to a connection becoming available
+ */
+ @Test
+ public void testBlockUntilConnectedInterrupt()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
final CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- client.start();
-
- final Thread threadToInterrupt = Thread.currentThread();
-
- Timer timer = new Timer();
- timer.schedule(new TimerTask() {
-
- @Override
- public void run() {
- threadToInterrupt.interrupt();
- }
- }, 3000);
-
- client.blockUntilConnected(5, TimeUnit.SECONDS);
- Assert.fail("Expected interruption did not occur");
+ client.start();
+
+ final Thread threadToInterrupt = Thread.currentThread();
+
+ Timer timer = new Timer();
+ timer.schedule(new TimerTask()
+ {
+
+ @Override
+ public void run()
+ {
+ threadToInterrupt.interrupt();
+ }
+ }, 3000);
+
+ client.blockUntilConnected(5, TimeUnit.SECONDS);
+ Assert.fail("Expected interruption did not occur");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- //This is expected
+ //This is expected
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index f37f7c0..41ba702 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
/**
@@ -39,24 +38,23 @@ public class AfterConnectionEstablished
* @param client The curator client
* @param runAfterConnection The logic to run
*/
- public static <T> T execute(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+ public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
{
//Block until connected
- final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
- Callable<T> internalCall = new Callable<T>()
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
+ Runnable internalCall = new Runnable()
{
@Override
- public T call() throws Exception
+ public void run()
{
try
{
client.blockUntilConnected();
- return runAfterConnection.call();
+ runAfterConnection.run();
}
catch ( Exception e )
{
log.error("An error occurred blocking until a connection is available", e);
- throw e;
}
finally
{
@@ -64,7 +62,7 @@ public class AfterConnectionEstablished
}
}
};
- return executor.submit(internalCall).get();
+ executor.submit(internalCall);
}
private AfterConnectionEstablished()
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index f4c1cef..dce3f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -26,7 +26,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
+import org.apache.curator.framework.recipes.AfterConnectionEstablished;
import org.apache.curator.framework.recipes.locks.LockInternals;
import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -156,17 +156,22 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground
+ AfterConnectionEstablished.execute
(
- client,
- new Callable<Void>()
+ client, new Runnable()
{
@Override
- public Void call() throws Exception
+ public void run()
{
client.getConnectionStateListenable().addListener(listener);
- reset();
- return null;
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("An error occurred checking resetting leadership.", e);
+ }
}
}
);
@@ -556,7 +561,7 @@ public class LeaderLatch implements Closeable
private void handleStateChange(ConnectionState newState)
{
- if (newState.isConnected())
+ if ( newState == ConnectionState.RECONNECTED )
{
try
{
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 35d8809..b97e708 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
@@ -35,7 +34,6 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
-
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
@@ -215,6 +213,17 @@ public class TestLeaderLatch extends BaseClassForTests
@Test
public void testWaiting() throws Exception
{
+ final int LOOPS = 10;
+ for ( int i = 0; i < LOOPS; ++i )
+ {
+ System.out.println("TRY #" + i);
+ internalTestWaitingOnce();
+ Thread.sleep(10);
+ }
+ }
+
+ private void internalTestWaitingOnce() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
@@ -241,10 +250,10 @@ public class TestLeaderLatch extends BaseClassForTests
Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
Thread.sleep((int)(10 * Math.random()));
+ thereIsALeader.set(false);
}
finally
{
- thereIsALeader.set(false);
latch.close();
}
return null;
@@ -259,7 +268,7 @@ public class TestLeaderLatch extends BaseClassForTests
}
finally
{
- executorService.shutdown();
+ executorService.shutdownNow();
CloseableUtils.closeQuietly(client);
}
}
@@ -526,23 +535,21 @@ public class TestLeaderLatch extends BaseClassForTests
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
public void testNoServerAtStart()
- {
+ {
CloseableUtils.closeQuietly(server);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryNTimes(5, 1000));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryNTimes(5, 1000));
client.start();
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final CountDownLatch leaderCounter = new CountDownLatch(1);
final AtomicInteger leaderCount = new AtomicInteger(0);
- final AtomicInteger notLeaderCount = new AtomicInteger(0);
+ final AtomicInteger notLeaderCount = new AtomicInteger(0);
leader.addListener(new LeaderLatchListener()
{
@Override
@@ -555,27 +562,26 @@ public class TestLeaderLatch extends BaseClassForTests
@Override
public void notLeader()
{
- notLeaderCount.incrementAndGet();
+ notLeaderCount.incrementAndGet();
}
});
try
{
- leader.start();
-
- //Wait for a while before starting the test server
- Thread.sleep(5000);
+ leader.start();
+
+ timing.sleepABit();
// Start the new server
- server = new TestingServer(server.getPort());
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
-
+
Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
- Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
+ Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
}
- catch (Exception e)
+ catch ( Exception e )
{
Assert.fail("Unexpected exception", e);
}
@@ -585,7 +591,7 @@ public class TestLeaderLatch extends BaseClassForTests
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
- }
+ }
private enum Mode
{
[04/18] git commit: some edits on the rpc web doc
Posted by ra...@apache.org.
some edits on the rpc web doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/70c52544
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/70c52544
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/70c52544
Branch: refs/heads/master
Commit: 70c5254460ab35c8ad4db46ace83425a1c1bb2db
Parents: 6e98562
Author: randgalt <ra...@apache.org>
Authored: Sat Jun 7 15:01:01 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jun 7 15:01:01 2014 -0500
----------------------------------------------------------------------
curator-x-rpc/src/site/confluence/index.confluence | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/70c52544/curator-x-rpc/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/index.confluence b/curator-x-rpc/src/site/confluence/index.confluence
index ffba2de..b3686c3 100644
--- a/curator-x-rpc/src/site/confluence/index.confluence
+++ b/curator-x-rpc/src/site/confluence/index.confluence
@@ -1,6 +1,10 @@
h1. Curator RPC Proxy
-NOTE: Curator RPC Proxy is in its own package in Maven Central: curator\-x\-rpc
+h2. Packaging
+
+Curator RPC Proxy is in its own package in Maven Central: curator\-x\-rpc
+
+h2. What Is a Curator RPC?
The Curator RPC module implements a proxy that bridges non\-java environments with the Curator framework and recipes. It uses
[[Apache Thrift|http://thrift.apache.org]] which supports a large set of languages and environments.
@@ -16,6 +20,7 @@ The benefits of Curator RPC are:
* There are Thrift implementations for a large number of languages/environments
h2. Thrift File
+
The current Curator RPC Thrift File can be found here: [[https://raw.githubusercontent.com/apache/curator/master/curator-x-rpc/src/main/thrift/curator.thrift]]. Use
this to generate code for the language/environment you need.
[10/18] git commit: CURATOR-110 - Fixed up the connection
notifications so that they will only notify blocked clients when a connected
state is reached, rather than any state.
Posted by ra...@apache.org.
CURATOR-110 - Fixed up the connection notifications so that they will
only notify blocked clients when a connected state is reached, rather
than any state.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/59bab738
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/59bab738
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/59bab738
Branch: refs/heads/master
Commit: 59bab738f56e3e5017bcba8984c1d8ca24e1e71a
Parents: e8138ed
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Mon Jun 16 16:45:17 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Mon Jun 16 16:45:17 2014 +1000
----------------------------------------------------------------------
.../apache/curator/framework/imps/CuratorFrameworkImpl.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/59bab738/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index d1de29f..14473d8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -173,9 +173,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- synchronized(connectionLock)
+ if(newState.isConnected())
{
- connectionLock.notifyAll();
+ synchronized(connectionLock)
+ {
+ connectionLock.notifyAll();
+ }
}
}
});
[11/18] git commit: 1. Added license. 2.
ExecuteAfterConnectionEstablished should use a Callable and allow the
exception to bubble up
Posted by ra...@apache.org.
1. Added license. 2. ExecuteAfterConnectionEstablished should use a Callable and allow the exception to bubble up
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/59076777
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/59076777
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/59076777
Branch: refs/heads/master
Commit: 59076777adfe42d96b0a92f775c8920033e1d975
Parents: 59bab73
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 12:40:34 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 12:40:34 2014 -0500
----------------------------------------------------------------------
.../framework/imps/TestBlockUntilConnected.java | 18 ++++
.../ExecuteAfterConnectionEstablished.java | 96 ++++++++++++--------
.../framework/recipes/leader/LeaderLatch.java | 32 +++----
3 files changed, 89 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/59076777/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index 996e5fc..8dfb7d8 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.imps;
import java.util.Timer;
http://git-wip-us.apache.org/repos/asf/curator/blob/59076777/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
index d213d37..408ed03 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
@@ -1,53 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes;
-import java.util.concurrent.ExecutorService;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
/**
* Utility class to allow execution of logic once a ZooKeeper connection becomes available.
- *
*/
public class ExecuteAfterConnectionEstablished
{
- private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
-
- /**
- * Spawns a new new background thread that will block until a connection is available and
- * then execute the 'runAfterConnection' logic
- * @param name The name of the spawned thread
- * @param client The curator client
- * @param runAfterConnection The logic to run
- */
- public static void executeAfterConnectionEstablishedInBackground(String name,
- final CuratorFramework client,
- final Runnable runAfterConnection)
- {
+ private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
+
+ /**
+ * Spawns a new new background thread that will block until a connection is available and
+ * then execute the 'runAfterConnection' logic
+ *
+ * @param client The curator client
+ * @param runAfterConnection The logic to run
+ */
+ public static <T> T executeAfterConnectionEstablishedInBackground(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+ {
//Block until connected
- final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(name);
- executor.submit(new Runnable()
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
+ Callable<T> internalCall = new Callable<T>()
{
-
- @Override
- public void run()
- {
- try
- {
- client.blockUntilConnected();
- runAfterConnection.run();
- }
- catch(Exception e)
- {
- log.error("An error occurred blocking until a connection is available", e);
- }
- finally
- {
- executor.shutdown();
- }
- }
- });
- }
+ @Override
+ public T call() throws Exception
+ {
+ try
+ {
+ client.blockUntilConnected();
+ return runAfterConnection.call();
+ }
+ catch ( Exception e )
+ {
+ log.error("An error occurred blocking until a connection is available", e);
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+ };
+ return executor.submit(internalCall).get();
+ }
+
+ private ExecuteAfterConnectionEstablished()
+ {
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/59076777/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 13a9f21..f4c1cef 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -33,7 +32,6 @@ import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -41,15 +39,14 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -159,23 +156,20 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(),
- client, new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- client.getConnectionStateListenable().addListener(listener);
- reset();
- }
- catch(Exception ex)
+ ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground
+ (
+ client,
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
{
- log.error("An error occurred checking resetting leadership.", ex);
+ client.getConnectionStateListenable().addListener(listener);
+ reset();
+ return null;
}
- }
- });
+ }
+ );
}
/**
[07/18] git commit: added desc for rpc
Posted by ra...@apache.org.
added desc for rpc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d4af5c5b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d4af5c5b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d4af5c5b
Branch: refs/heads/master
Commit: d4af5c5bb48af8fa84075218eb1e1de924417ff6
Parents: a57be39
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 9 13:27:19 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 9 13:27:19 2014 -0500
----------------------------------------------------------------------
curator-x-rpc/pom.xml | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d4af5c5b/curator-x-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-rpc/pom.xml b/curator-x-rpc/pom.xml
index 1bc3b90..f66cd76 100644
--- a/curator-x-rpc/pom.xml
+++ b/curator-x-rpc/pom.xml
@@ -10,6 +10,10 @@
<artifactId>curator-x-rpc</artifactId>
<version>2.5.1-SNAPSHOT</version>
+ <name>Curator RPC Proxy</name>
+ <description>A proxy that bridges non-java environments with the Curator framework and recipes</description>
+ <inceptionYear>2014</inceptionYear>
+
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
[18/18] git commit: Merge branch 'CURATOR-110'
Posted by ra...@apache.org.
Merge branch 'CURATOR-110'
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5d7d0c7f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5d7d0c7f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5d7d0c7f
Branch: refs/heads/master
Commit: 5d7d0c7f1430dde06cef8800bca3f035dd707c0d
Parents: d4af5c5 bac23d3
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 18:02:55 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 18:02:55 2014 -0500
----------------------------------------------------------------------
.../org/apache/curator/utils/ThreadUtils.java | 9 +
.../curator/framework/CuratorFramework.java | 20 ++
.../framework/imps/CuratorFrameworkImpl.java | 225 ++++++++----------
.../framework/state/ConnectionState.java | 45 +++-
.../framework/state/ConnectionStateManager.java | 37 +++
.../framework/imps/TestBlockUntilConnected.java | 235 +++++++++++++++++++
.../recipes/AfterConnectionEstablished.java | 74 ++++++
.../framework/recipes/leader/LeaderLatch.java | 153 +++++++-----
.../recipes/leader/TestLeaderLatch.java | 125 +++++++++-
9 files changed, 739 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
[02/18] git commit: CURATOR-110 - Fixed up formatting to Curator
standards
Posted by ra...@apache.org.
CURATOR-110 - Fixed up formatting to Curator standards
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0b7ae7e3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0b7ae7e3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0b7ae7e3
Branch: refs/heads/master
Commit: 0b7ae7e3672a9c817fe31144cd6b18d9a357f124
Parents: 1a63a10
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Tue Jun 3 09:01:31 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Tue Jun 3 09:01:31 2014 +1000
----------------------------------------------------------------------
.../framework/state/ConnectionState.java | 35 +-
.../framework/recipes/leader/LeaderLatch.java | 620 +++++++++----------
.../recipes/leader/TestLeaderLatch.java | 491 ++++++++-------
3 files changed, 602 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index b25ce38..7663e10 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -23,17 +23,18 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
/**
* Represents state changes in the connection to ZK
*/
-public enum ConnectionState {
+public enum ConnectionState
+{
/**
- * Sent for the first successful connection to the server. NOTE: You will
- * only get one of these messages for any CuratorFramework instance.
+ * Sent for the first successful connection to the server. NOTE: You will only
+ * get one of these messages for any CuratorFramework instance.
*/
CONNECTED,
/**
* There has been a loss of connection. Leaders, locks, etc. should suspend
- * until the connection is re-established. If the connection times-out you
- * will receive a {@link #LOST} notice
+ * until the connection is re-established. If the connection times-out you will
+ * receive a {@link #LOST} notice
*/
SUSPENDED,
@@ -43,30 +44,28 @@ public enum ConnectionState {
RECONNECTED,
/**
- * The connection is confirmed to be lost. Close any locks, leaders, etc.
- * and attempt to re-create them. NOTE: it is possible to get a
- * {@link #RECONNECTED} state after this but you should still consider any
- * locks, etc. as dirty/unstable
+ * The connection is confirmed to be lost. Close any locks, leaders, etc. and
+ * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
+ * state after this but you should still consider any locks, etc. as dirty/unstable
*/
LOST,
/**
- * The connection has gone into read-only mode. This can only happen if you
- * pass true for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}.
- * See the ZooKeeper doc regarding read only connections: <a
- * href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode"
- * >http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>. The
- * connection will remain in read only mode until another state change is
- * sent.
+ * The connection has gone into read-only mode. This can only happen if you pass true
+ * for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}. See the ZooKeeper doc
+ * regarding read only connections:
+ * <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
+ * The connection will remain in read only mode until another state change is sent.
*/
READ_ONLY;
-
+
/**
* Check if this state indicates that Curator has a connection to ZooKeeper
*
* @return True if connected, false otherwise
*/
- public boolean isConnected() {
+ public boolean isConnected()
+ {
return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index d09ed1b..c1157c9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -19,17 +19,9 @@
package org.apache.curator.framework.recipes.leader;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -46,60 +38,71 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
- * Abstraction to select a "leader" amongst multiple contenders in a group of
- * JMVs connected to a Zookeeper cluster. If a group of N thread/processes
- * contend for leadership one will randomly be assigned leader until it releases
- * leadership at which time another one from the group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
+ * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
+ * randomly be assigned leader until it releases leadership at which time another one from the
+ * group will randomly be chosen
* </p>
*/
-public class LeaderLatch implements Closeable {
+public class LeaderLatch implements Closeable
+{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String latchPath;
private final String id;
- private final AtomicReference<State> state = new AtomicReference<State>(
- State.LATENT);
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;
- private final ConnectionStateListener listener = new ConnectionStateListener() {
+ private final ConnectionStateListener listener = new ConnectionStateListener()
+ {
@Override
- public void stateChanged(CuratorFramework client,
- ConnectionState newState) {
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
handleStateChange(newState);
}
};
private static final String LOCK_NAME = "latch-";
- private static final LockInternalsSorter sorter = new LockInternalsSorter() {
+ private static final LockInternalsSorter sorter = new LockInternalsSorter()
+ {
@Override
- public String fixForSorting(String str, String lockName) {
- return StandardLockInternalsDriver.standardFixForSorting(str,
- lockName);
+ public String fixForSorting(String str, String lockName)
+ {
+ return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
}
};
- public enum State {
- LATENT, STARTED, CLOSED
+ public enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
}
/**
* How to handle listeners when the latch is closed
*/
- public enum CloseMode {
+ public enum CloseMode
+ {
/**
- * When the latch is closed, listeners will *not* be notified (default
- * behavior)
+ * When the latch is closed, listeners will *not* be notified (default behavior)
*/
SILENT,
@@ -110,116 +113,105 @@ public class LeaderLatch implements Closeable {
}
/**
- * @param client
- * the client
- * @param latchPath
- * the path for this leadership group
+ * @param client the client
+ * @param latchPath the path for this leadership group
*/
- public LeaderLatch(CuratorFramework client, String latchPath) {
+ public LeaderLatch(CuratorFramework client, String latchPath)
+ {
this(client, latchPath, "", CloseMode.SILENT);
}
/**
- * @param client
- * the client
- * @param latchPath
- * the path for this leadership group
- * @param id
- * participant ID
+ * @param client the client
+ * @param latchPath the path for this leadership group
+ * @param id participant ID
*/
- public LeaderLatch(CuratorFramework client, String latchPath, String id) {
+ public LeaderLatch(CuratorFramework client, String latchPath, String id)
+ {
this(client, latchPath, id, CloseMode.SILENT);
}
/**
- * @param client
- * the client
- * @param latchPath
- * the path for this leadership group
- * @param id
- * participant ID
- * @param closeMode
- * behaviour of listener on explicit close.
+ * @param client the client
+ * @param latchPath the path for this leadership group
+ * @param id participant ID
+ * @param closeMode behaviour of listener on explicit close.
*/
- public LeaderLatch(CuratorFramework client, String latchPath, String id,
- CloseMode closeMode) {
- this.client = Preconditions.checkNotNull(client,
- "client cannot be null");
- this.latchPath = Preconditions.checkNotNull(latchPath,
- "mutexPath cannot be null");
+ public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
+ {
+ this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
this.id = Preconditions.checkNotNull(id, "id cannot be null");
- this.closeMode = Preconditions.checkNotNull(closeMode,
- "closeMode cannot be null");
+ this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
/**
- * Add this instance to the leadership election and attempt to acquire
- * leadership.
- *
- * @throws Exception
- * errors
+ * Add this instance to the leadership election and attempt to acquire leadership.
+ *
+ * @throws Exception errors
*/
- public void start() throws Exception {
- Preconditions.checkState(
- state.compareAndSet(State.LATENT, State.STARTED),
- "Cannot be started more than once");
+ public void start() throws Exception
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
client.getConnectionStateListenable().addListener(listener);
reset();
}
/**
- * Remove this instance from the leadership election. If this instance is
- * the leader, leadership is released. IMPORTANT: the only way to release
- * leadership is by calling close(). All LeaderLatch instances must
- * eventually be closed.
- *
- * @throws IOException
- * errors
+ * Remove this instance from the leadership election. If this instance is the leader, leadership
+ * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+ * instances must eventually be closed.
+ *
+ * @throws IOException errors
*/
@Override
- public void close() throws IOException {
+ public void close() throws IOException
+ {
close(closeMode);
}
/**
- * Remove this instance from the leadership election. If this instance is
- * the leader, leadership is released. IMPORTANT: the only way to release
- * leadership is by calling close(). All LeaderLatch instances must
- * eventually be closed.
- *
- * @param closeMode
- * allows the default close mode to be overridden at the time the
- * latch is closed.
- *
- * @throws IOException
- * errors
+ * Remove this instance from the leadership election. If this instance is the leader, leadership
+ * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+ * instances must eventually be closed.
+ *
+ * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
+ *
+ * @throws IOException errors
*/
- public void close(CloseMode closeMode) throws IOException {
- Preconditions.checkState(
- state.compareAndSet(State.STARTED, State.CLOSED),
- "Already closed or has not been started");
+ public void close(CloseMode closeMode) throws IOException
+ {
+ Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
- try {
+ try
+ {
setNode(null);
- } catch (Exception e) {
+ }
+ catch ( Exception e )
+ {
throw new IOException(e);
- } finally {
+ }
+ finally
+ {
client.getConnectionStateListenable().removeListener(listener);
- switch (closeMode) {
- case NOTIFY_LEADER: {
- setLeadership(false);
- listeners.clear();
- break;
- }
+ switch ( closeMode )
+ {
+ case NOTIFY_LEADER:
+ {
+ setLeadership(false);
+ listeners.clear();
+ break;
+ }
- default: {
- listeners.clear();
- setLeadership(false);
- break;
- }
+ default:
+ {
+ listeners.clear();
+ setLeadership(false);
+ break;
+ }
}
}
}
@@ -227,160 +219,134 @@ public class LeaderLatch implements Closeable {
/**
* Attaches a listener to this LeaderLatch
* <p>
- * Attaching the same listener multiple times is a noop from the second time
- * on.
+ * Attaching the same listener multiple times is a noop from the second time on.
* <p>
- * All methods for the listener are run using the provided Executor. It is
- * common to pass in a single-threaded executor so that you can be certain
- * that listener methods are called in sequence, but if you are fine with
+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
+ * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
- *
- * @param listener
- * the listener to attach
+ *
+ * @param listener the listener to attach
*/
- public void addListener(LeaderLatchListener listener) {
+ public void addListener(LeaderLatchListener listener)
+ {
listeners.addListener(listener);
}
/**
* Attaches a listener to this LeaderLatch
* <p>
- * Attaching the same listener multiple times is a noop from the second time
- * on.
+ * Attaching the same listener multiple times is a noop from the second time on.
* <p>
- * All methods for the listener are run using the provided Executor. It is
- * common to pass in a single-threaded executor so that you can be certain
- * that listener methods are called in sequence, but if you are fine with
+ * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
+ * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
- *
- * @param listener
- * the listener to attach
- * @param executor
- * An executor to run the methods for the listener on.
+ *
+ * @param listener the listener to attach
+ * @param executor An executor to run the methods for the listener on.
*/
- public void addListener(LeaderLatchListener listener, Executor executor) {
+ public void addListener(LeaderLatchListener listener, Executor executor)
+ {
listeners.addListener(listener, executor);
}
/**
* Removes a given listener from this LeaderLatch
- *
- * @param listener
- * the listener to remove
+ *
+ * @param listener the listener to remove
*/
- public void removeListener(LeaderLatchListener listener) {
+ public void removeListener(LeaderLatchListener listener)
+ {
listeners.removeListener(listener);
}
/**
- * <p>
- * Causes the current thread to wait until this instance acquires leadership
- * unless the thread is {@linkplain Thread#interrupt interrupted} or
- * {@linkplain #close() closed}.
- * </p>
- * <p>
- * If this instance already is the leader then this method returns
- * immediately.
- * </p>
- *
- * <p>
- * Otherwise the current thread becomes disabled for thread scheduling
- * purposes and lies dormant until one of three things happen:
- * </p>
+ * <p>Causes the current thread to wait until this instance acquires leadership
+ * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
+ * <p>If this instance already is the leader then this method returns immediately.</p>
+ *
+ * <p>Otherwise the current
+ * thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of three things happen:</p>
* <ul>
* <li>This instance becomes the leader</li>
- * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
- * current thread</li>
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- * <p>
- * If the current thread:
- * </p>
+ * <p>If the current thread:</p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
- * <p>
- * then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * </p>
- *
- * @throws InterruptedException
- * if the current thread is interrupted while waiting
- * @throws EOFException
- * if the instance is {@linkplain #close() closed} while waiting
+ * <p>then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.</p>
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
+ * @throws EOFException if the instance is {@linkplain #close() closed}
+ * while waiting
*/
- public void await() throws InterruptedException, EOFException {
- synchronized (this) {
- while ((state.get() == State.STARTED) && !hasLeadership.get()) {
+ public void await() throws InterruptedException, EOFException
+ {
+ synchronized(this)
+ {
+ while ( (state.get() == State.STARTED) && !hasLeadership.get() )
+ {
wait();
}
}
- if (state.get() != State.STARTED) {
+ if ( state.get() != State.STARTED )
+ {
throw new EOFException();
}
}
/**
- * <p>
- * Causes the current thread to wait until this instance acquires leadership
- * unless the thread is {@linkplain Thread#interrupt interrupted}, the
- * specified waiting time elapses or the instance is {@linkplain #close()
- * closed}.
- * </p>
- *
- * <p>
- * If this instance already is the leader then this method returns
- * immediately with the value {@code true}.
- * </p>
- *
- * <p>
- * Otherwise the current thread becomes disabled for thread scheduling
- * purposes and lies dormant until one of four things happen:
- * </p>
+ * <p>Causes the current thread to wait until this instance acquires leadership
+ * unless the thread is {@linkplain Thread#interrupt interrupted},
+ * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
+ *
+ * <p>If this instance already is the leader then this method returns immediately
+ * with the value {@code true}.</p>
+ *
+ * <p>Otherwise the current
+ * thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of four things happen:</p>
* <ul>
* <li>This instance becomes the leader</li>
- * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
- * current thread</li>
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread</li>
* <li>The specified waiting time elapses.</li>
* <li>The instance is {@linkplain #close() closed}</li>
* </ul>
- *
- * <p>
- * If the current thread:
- * </p>
+ *
+ * <p>If the current thread:</p>
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
- * <p>
- * then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- * </p>
- *
- * <p>
- * If the specified waiting time elapses or the instance is
- * {@linkplain #close() closed} then the value {@code false} is returned. If
- * the time is less than or equal to zero, the method will not wait at all.
- * </p>
- *
- * @param timeout
- * the maximum time to wait
- * @param unit
- * the time unit of the {@code timeout} argument
- * @return {@code true} if the count reached zero and {@code false} if the
- * waiting time elapsed before the count reached zero or the
- * instances was closed
- * @throws InterruptedException
- * if the current thread is interrupted while waiting
+ * <p>then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.</p>
+ *
+ * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
+ * then the value {@code false} is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.</p>
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if the count reached zero and {@code false}
+ * if the waiting time elapsed before the count reached zero or the instances was closed
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting
*/
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException {
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException
+ {
long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
- synchronized (this) {
- while ((waitNanos > 0) && (state.get() == State.STARTED)
- && !hasLeadership.get()) {
+ synchronized(this)
+ {
+ while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
+ {
long startNanos = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
long elapsed = System.nanoTime() - startNanos;
@@ -392,23 +358,24 @@ public class LeaderLatch implements Closeable {
/**
* Return this instance's participant Id
- *
+ *
* @return participant Id
*/
- public String getId() {
+ public String getId()
+ {
return id;
}
/**
- * Returns this instances current state, this is the only way to verify that
- * the object has been closed before closing again. If you try to close a
- * latch multiple times, the close() method will throw an
- * IllegalArgumentException which is often not caught and ignored
- * (CloseableUtils.closeQuietly() only looks for IOException).
- *
+ * Returns this instances current state, this is the only way to verify that the object has been closed before
+ * closing again. If you try to close a latch multiple times, the close() method will throw an
+ * IllegalArgumentException which is often not caught and ignored (CloseableUtils.closeQuietly() only looks for
+ * IOException).
+ *
* @return the state of the current instance
*/
- public State getState() {
+ public State getState()
+ {
return state.get();
}
@@ -419,17 +386,16 @@ public class LeaderLatch implements Closeable {
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as
- * hasLeadership uses a local field of the class.
+ * return a value that does not match {@link #hasLeadership()} as hasLeadership
+ * uses a local field of the class.
* </p>
- *
+ *
* @return participants
- * @throws Exception
- * ZK errors, interruptions, etc.
+ * @throws Exception ZK errors, interruptions, etc.
*/
- public Collection<Participant> getParticipants() throws Exception {
- Collection<String> participantNodes = LockInternals
- .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+ public Collection<Participant> getParticipants() throws Exception
+ {
+ Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
return LeaderSelector.getParticipants(client, participantNodes);
}
@@ -441,26 +407,26 @@ public class LeaderLatch implements Closeable {
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
- * return a value that does not match {@link #hasLeadership()} as
- * hasLeadership uses a local field of the class.
+ * return a value that does not match {@link #hasLeadership()} as hasLeadership
+ * uses a local field of the class.
* </p>
- *
+ *
* @return leader
- * @throws Exception
- * ZK errors, interruptions, etc.
+ * @throws Exception ZK errors, interruptions, etc.
*/
- public Participant getLeader() throws Exception {
- Collection<String> participantNodes = LockInternals
- .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+ public Participant getLeader() throws Exception
+ {
+ Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
return LeaderSelector.getLeader(client, participantNodes);
}
/**
* Return true if leadership is currently held by this instance
- *
+ *
* @return true/false
*/
- public boolean hasLeadership() {
+ public boolean hasLeadership()
+ {
return (state.get() == State.STARTED) && hasLeadership.get();
}
@@ -468,95 +434,105 @@ public class LeaderLatch implements Closeable {
volatile CountDownLatch debugResetWaitLatch = null;
@VisibleForTesting
- void reset() throws Exception {
+ void reset() throws Exception
+ {
setLeadership(false);
setNode(null);
- BackgroundCallback callback = new BackgroundCallback() {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client,
- CuratorEvent event) throws Exception {
- if (debugResetWaitLatch != null) {
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( debugResetWaitLatch != null )
+ {
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
- if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
setNode(event.getName());
- if (state.get() == State.CLOSED) {
+ if ( state.get() == State.CLOSED )
+ {
setNode(null);
- } else {
+ }
+ else
+ {
getChildren();
}
- } else {
- log.error("getChildren() failed. rc = "
- + event.getResultCode());
+ }
+ else
+ {
+ log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
- client.create()
- .creatingParentsIfNeeded()
- .withProtection()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
- .inBackground(callback)
- .forPath(ZKPaths.makePath(latchPath, LOCK_NAME),
- LeaderSelector.getIdBytes(id));
+ client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
- private void checkLeadership(List<String> children) throws Exception {
+ private void checkLeadership(List<String> children) throws Exception
+ {
final String localOurPath = ourPath.get();
- List<String> sortedChildren = LockInternals.getSortedChildren(
- LOCK_NAME, sorter, children);
- int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths
- .getNodeFromPath(localOurPath)) : -1;
- if (ourIndex < 0) {
+ List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
+ int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+ if ( ourIndex < 0 )
+ {
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
- } else if (ourIndex == 0) {
+ }
+ else if ( ourIndex == 0 )
+ {
setLeadership(true);
- } else {
+ }
+ else
+ {
String watchPath = sortedChildren.get(ourIndex - 1);
- Watcher watcher = new Watcher() {
+ Watcher watcher = new Watcher()
+ {
@Override
- public void process(WatchedEvent event) {
- if ((state.get() == State.STARTED)
- && (event.getType() == Event.EventType.NodeDeleted)
- && (localOurPath != null)) {
- try {
+ public void process(WatchedEvent event)
+ {
+ if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
+ {
+ try
+ {
getChildren();
- } catch (Exception ex) {
- log.error(
- "An error occurred checking the leadership.",
- ex);
+ }
+ catch ( Exception ex )
+ {
+ log.error("An error occurred checking the leadership.", ex);
}
}
}
};
- BackgroundCallback callback = new BackgroundCallback() {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client,
- CuratorEvent event) throws Exception {
- if (event.getResultCode() == KeeperException.Code.NONODE
- .intValue()) {
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+ {
// previous node is gone - reset
reset();
}
}
};
- // use getData() instead of exists() to avoid leaving unneeded
- // watchers which is a type of resource leak
- client.getData().usingWatcher(watcher).inBackground(callback)
- .forPath(ZKPaths.makePath(latchPath, watchPath));
+ // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
+ client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
- private void getChildren() throws Exception {
- BackgroundCallback callback = new BackgroundCallback() {
+ private void getChildren() throws Exception
+ {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client,
- CuratorEvent event) throws Exception {
- if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
checkLeadership(event.getChildren());
}
}
@@ -564,47 +540,69 @@ public class LeaderLatch implements Closeable {
client.getChildren().inBackground(callback).forPath(latchPath);
}
- private void handleStateChange(ConnectionState newState) {
- if (newState.isConnected()) {
- try {
+ private void handleStateChange(ConnectionState newState)
+ {
+ if (newState.isConnected())
+ {
+ try
+ {
reset();
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
log.error("Could not reset leader latch", e);
setLeadership(false);
}
- } else {
+ }
+ else
+ {
setLeadership(false);
}
}
- private synchronized void setLeadership(boolean newValue) {
+ private synchronized void setLeadership(boolean newValue)
+ {
boolean oldValue = hasLeadership.getAndSet(newValue);
- if (oldValue && !newValue) { // Lost leadership, was true, now false
- listeners.forEach(new Function<LeaderLatchListener, Void>() {
- @Override
- public Void apply(LeaderLatchListener listener) {
- listener.notLeader();
- return null;
+ if ( oldValue && !newValue )
+ { // Lost leadership, was true, now false
+ listeners.forEach
+ (
+ new Function<LeaderLatchListener, Void>()
+ {
+ @Override
+ public Void apply(LeaderLatchListener listener)
+ {
+ listener.notLeader();
+ return null;
+ }
}
- });
- } else if (!oldValue && newValue) { // Gained leadership, was false, now
- // true
- listeners.forEach(new Function<LeaderLatchListener, Void>() {
- @Override
- public Void apply(LeaderLatchListener input) {
- input.isLeader();
- return null;
+ );
+ }
+ else if ( !oldValue && newValue )
+ { // Gained leadership, was false, now true
+ listeners.forEach
+ (
+ new Function<LeaderLatchListener, Void>()
+ {
+ @Override
+ public Void apply(LeaderLatchListener input)
+ {
+ input.isLeader();
+ return null;
+ }
}
- });
+ );
}
notifyAll();
}
- private void setNode(String newValue) throws Exception {
+ private void setNode(String newValue) throws Exception
+ {
String oldPath = ourPath.getAndSet(newValue);
- if (oldPath != null) {
+ if ( oldPath != null )
+ {
client.delete().guaranteed().inBackground().forPath(oldPath);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 108f118..d3b00bc 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,17 +19,9 @@
package org.apache.curator.framework.recipes.leader;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
@@ -41,29 +33,36 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class TestLeaderLatch extends BaseClassForTests {
+public class TestLeaderLatch extends BaseClassForTests
+{
private static final String PATH_NAME = "/one/two/me";
private static final int MAX_LOOPS = 5;
@Test
- public void testResetRace() throws Exception {
+ public void testResetRace() throws Exception
+ {
Timing timing = new Timing();
LeaderLatch latch = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
latch = new LeaderLatch(client, PATH_NAME);
latch.debugResetWaitLatch = new CountDownLatch(1);
- latch.start(); // will call reset()
- latch.reset(); // should not result in two nodes
+ latch.start(); // will call reset()
+ latch.reset(); // should not result in two nodes
timing.sleepABit();
@@ -71,21 +70,22 @@ public class TestLeaderLatch extends BaseClassForTests {
timing.sleepABit();
- Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
- 1);
- } finally {
+ Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
+ }
+ finally
+ {
CloseableUtils.closeQuietly(latch);
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testCreateDeleteRace() throws Exception {
+ public void testCreateDeleteRace() throws Exception
+ {
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
@@ -100,40 +100,43 @@ public class TestLeaderLatch extends BaseClassForTests {
timing.sleepABit();
- Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
- 0);
+ Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
- } finally {
+ }
+ finally
+ {
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testLostConnection() throws Exception {
+ public void testLostConnection() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
final CountDownLatch countDownLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(
- new ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client,
- ConnectionState newState) {
- if (newState == ConnectionState.LOST) {
- countDownLatch.countDown();
- }
- }
- });
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ countDownLatch.countDown();
+ }
+ }
+ });
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
@@ -148,12 +151,13 @@ public class TestLeaderLatch extends BaseClassForTests {
Assert.assertEquals(getLeaders(latches).size(), 0);
- server = new TestingServer(server.getPort(),
- server.getTempDirectory());
- Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should
- // reconnect
- } finally {
- for (LeaderLatch latch : latches) {
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+ Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
@@ -161,20 +165,21 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testCorrectWatching() throws Exception {
+ public void testCorrectWatching() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
final int PARTICIPANT_ID = 2;
List<LeaderLatch> latches = Lists.newArrayList();
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.start();
latches.add(latch);
@@ -182,20 +187,20 @@ public class TestLeaderLatch extends BaseClassForTests {
waitForALeader(latches, timing);
- // we need to close a Participant that doesn't be actual leader
- // (first Participant) nor the last
+ //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
latches.get(PARTICIPANT_ID).close();
- // As the previous algorithm assumed that if the watched node is
- // deleted gets the leadership
- // we need to ensure that the PARTICIPANT_ID-1 is not getting
- // (wrongly) elected as leader.
+ //As the previous algorithm assumed that if the watched node is deleted gets the leadership
+ //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
- } finally {
- // removes the already closed participant
+ }
+ finally
+ {
+ //removes the already closed participant
latches.remove(PARTICIPANT_ID);
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
@@ -204,35 +209,37 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testWaiting() throws Exception {
+ public void testWaiting() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
- ExecutorService executorService = Executors
- .newFixedThreadPool(PARTICIPANT_QTY);
- ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(
- executorService);
+ ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
+ ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(executorService);
final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
- service.submit(new Callable<Void>() {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ service.submit(new Callable<Void>()
+ {
@Override
- public Void call() throws Exception {
+ public Void call() throws Exception
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- try {
+ try
+ {
latch.start();
- Assert.assertTrue(latch.await(timing.forWaiting()
- .seconds(), TimeUnit.SECONDS));
- Assert.assertTrue(thereIsALeader.compareAndSet(
- false, true));
- Thread.sleep((int) (10 * Math.random()));
- } finally {
+ Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
+ Thread.sleep((int)(10 * Math.random()));
+ }
+ finally
+ {
thereIsALeader.set(false);
latch.close();
}
@@ -241,58 +248,68 @@ public class TestLeaderLatch extends BaseClassForTests {
});
}
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
service.take().get();
}
- } finally {
+ }
+ finally
+ {
executorService.shutdown();
CloseableUtils.closeQuietly(client);
}
}
@Test
- public void testBasic() throws Exception {
+ public void testBasic() throws Exception
+ {
basic(Mode.START_IMMEDIATELY);
}
@Test
- public void testBasicAlt() throws Exception {
+ public void testBasicAlt() throws Exception
+ {
basic(Mode.START_IN_THREADS);
}
@Test
- public void testCallbackSanity() throws Exception {
+ public void testCallbackSanity() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- ExecutorService exec = Executors
- .newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setDaemon(true).setNameFormat("callbackSanity-%s")
- .build());
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
List<LeaderLatch> latches = Lists.newArrayList();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- latch.addListener(new LeaderLatchListener() {
+ latch.addListener(new LeaderLatchListener()
+ {
boolean beenLeader = false;
@Override
- public void isLeader() {
- if (!beenLeader) {
+ public void isLeader()
+ {
+ if ( !beenLeader )
+ {
masterCounter.incrementAndGet();
beenLeader = true;
- try {
+ try
+ {
latch.reset();
- } catch (Exception e) {
+ }
+ catch ( Exception e )
+ {
throw Throwables.propagate(e);
}
- } else {
+ }
+ else
+ {
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
@@ -300,17 +317,20 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
- try {
+ try
+ {
client.start();
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
latch.start();
}
@@ -318,12 +338,17 @@ public class TestLeaderLatch extends BaseClassForTests {
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
- } finally {
- for (LeaderLatch latch : latches) {
- if (latch.getState() != LeaderLatch.State.CLOSED) {
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(latch);
}
}
@@ -332,7 +357,8 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testCallbackNotifyLeader() throws Exception {
+ public void testCallbackNotifyLeader() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
final int SILENT_QTY = 3;
@@ -341,35 +367,37 @@ public class TestLeaderLatch extends BaseClassForTests {
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- ExecutorService exec = Executors
- .newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("callbackNotifyLeader-%s").build());
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
List<LeaderLatch> latches = Lists.newArrayList();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
- LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT
- : LeaderLatch.CloseMode.NOTIFY_LEADER;
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
- final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "",
- closeMode);
- latch.addListener(new LeaderLatchListener() {
+ final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
+ latch.addListener(new LeaderLatchListener()
+ {
boolean beenLeader = false;
@Override
- public void isLeader() {
- if (!beenLeader) {
+ public void isLeader()
+ {
+ if ( !beenLeader )
+ {
masterCounter.incrementAndGet();
beenLeader = true;
- try {
+ try
+ {
latch.reset();
- } catch (Exception e) {
+ }
+ catch ( Exception e )
+ {
throw Throwables.propagate(e);
}
- } else {
+ }
+ else
+ {
masterCounter.incrementAndGet();
CloseableUtils.closeQuietly(latch);
timesSquare.countDown();
@@ -377,31 +405,38 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
notLeaderCounter.incrementAndGet();
}
}, exec);
latches.add(latch);
}
- try {
+ try
+ {
client.start();
- for (LeaderLatch latch : latches) {
+ for ( LeaderLatch latch : latches )
+ {
latch.start();
}
timesSquare.await();
Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
- Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2
- - SILENT_QTY);
- for (LeaderLatch latch : latches) {
+ Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
+ for ( LeaderLatch latch : latches )
+ {
Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
}
- } finally {
- for (LeaderLatch latch : latches) {
- if (latch.getState() != LeaderLatch.State.CLOSED) {
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(latch);
}
}
@@ -410,42 +445,47 @@ public class TestLeaderLatch extends BaseClassForTests {
}
@Test
- public void testCallbackDontNotify() throws Exception {
+ public void testCallbackDontNotify() throws Exception
+ {
final AtomicLong masterCounter = new AtomicLong(0);
final AtomicLong notLeaderCounter = new AtomicLong(0);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
- final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME,
- "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+ final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
- leader.addListener(new LeaderLatchListener() {
+ leader.addListener(new LeaderLatchListener()
+ {
@Override
- public void isLeader() {
+ public void isLeader()
+ {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
masterCounter.incrementAndGet();
}
});
- notifiedLeader.addListener(new LeaderLatchListener() {
+ notifiedLeader.addListener(new LeaderLatchListener()
+ {
@Override
- public void isLeader() {
+ public void isLeader()
+ {
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
notLeaderCounter.incrementAndGet();
}
});
- try {
+ try
+ {
client.start();
leader.start();
@@ -464,24 +504,28 @@ public class TestLeaderLatch extends BaseClassForTests {
leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
- Assert.assertEquals(notifiedLeader.getState(),
- LeaderLatch.State.CLOSED);
+ Assert.assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
Assert.assertEquals(masterCounter.get(), 1);
Assert.assertEquals(notLeaderCounter.get(), 0);
- } finally {
- if (leader.getState() != LeaderLatch.State.CLOSED) {
+ }
+ finally
+ {
+ if ( leader.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(leader);
}
- if (notifiedLeader.getState() != LeaderLatch.State.CLOSED) {
+ if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
+ {
CloseableUtils.closeQuietly(notifiedLeader);
}
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
- public void testNoServerAtStart() {
+ public void testNoServerAtStart()
+ {
CloseableUtils.closeQuietly(server);
Timing timing = new Timing();
@@ -497,12 +541,14 @@ public class TestLeaderLatch extends BaseClassForTests {
// before starting. Otherwise it's possible that the server will start
// during retries of the inital commands to create the latch zNodes
client.getConnectionStateListenable().addListener(
- new ConnectionStateListener() {
-
+ new ConnectionStateListener()
+ {
@Override
public void stateChanged(CuratorFramework client,
- ConnectionState newState) {
- if (newState == ConnectionState.LOST) {
+ ConnectionState newState)
+ {
+ if (newState == ConnectionState.LOST)
+ {
sessionLostCount.countDown();
}
}
@@ -510,20 +556,23 @@ public class TestLeaderLatch extends BaseClassForTests {
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final CountDownLatch leaderCounter = new CountDownLatch(1);
- leader.addListener(new LeaderLatchListener() {
-
+ leader.addListener(new LeaderLatchListener()
+ {
@Override
- public void isLeader() {
+ public void isLeader()
+ {
leaderCounter.countDown();
}
@Override
- public void notLeader() {
+ public void notLeader()
+ {
}
});
- try {
+ try
+ {
leader.start();
timing.awaitLatch(sessionLostCount);
@@ -533,47 +582,58 @@ public class TestLeaderLatch extends BaseClassForTests {
timing.awaitLatch(leaderCounter);
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
Assert.fail("Unexpected exception", e);
- } finally {
+ }
+ finally
+ {
CloseableUtils.closeQuietly(leader);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
- }
+ }
- private enum Mode {
- START_IMMEDIATELY, START_IN_THREADS
+ private enum Mode
+ {
+ START_IMMEDIATELY,
+ START_IN_THREADS
}
- private void basic(Mode mode) throws Exception {
- final int PARTICIPANT_QTY = 1;// 0;
+ private void basic(Mode mode) throws Exception
+ {
+ final int PARTICIPANT_QTY = 1;//0;
List<LeaderLatch> latches = Lists.newArrayList();
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
- try {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
client.start();
- for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
- if (mode == Mode.START_IMMEDIATELY) {
+ if ( mode == Mode.START_IMMEDIATELY )
+ {
latch.start();
}
latches.add(latch);
}
- if (mode == Mode.START_IN_THREADS) {
- ExecutorService service = Executors.newFixedThreadPool(latches
- .size());
- for (final LeaderLatch latch : latches) {
- service.submit(new Callable<Object>() {
+ if ( mode == Mode.START_IN_THREADS )
+ {
+ ExecutorService service = Executors.newFixedThreadPool(latches.size());
+ for ( final LeaderLatch latch : latches )
+ {
+ service.submit(new Callable<Object>()
+ {
@Override
- public Object call() throws Exception {
- Thread.sleep((int) (100 * Math.random()));
+ public Object call() throws Exception
+ {
+ Thread.sleep((int)(100 * Math.random()));
latch.start();
return null;
}
@@ -582,38 +642,36 @@ public class TestLeaderLatch extends BaseClassForTests {
service.shutdown();
}
- while (latches.size() > 0) {
+ while ( latches.size() > 0 )
+ {
List<LeaderLatch> leaders = waitForALeader(latches, timing);
- Assert.assertEquals(leaders.size(), 1); // there can only be one
- // leader
+ Assert.assertEquals(leaders.size(), 1); // there can only be one leader
LeaderLatch theLeader = leaders.get(0);
- if (mode == Mode.START_IMMEDIATELY) {
- Assert.assertEquals(latches.indexOf(theLeader), 0); // assert
- // ordering
- // -
- // leadership
- // should
- // advance
- // in
- // start
- // order
+ if ( mode == Mode.START_IMMEDIATELY )
+ {
+ Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
}
theLeader.close();
latches.remove(theLeader);
}
- } finally {
- for (LeaderLatch latch : latches) {
+ }
+ finally
+ {
+ for ( LeaderLatch latch : latches )
+ {
CloseableUtils.closeQuietly(latch);
}
CloseableUtils.closeQuietly(client);
}
}
- private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches,
- Timing timing) throws InterruptedException {
- for (int i = 0; i < MAX_LOOPS; ++i) {
+ private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
+ {
+ for ( int i = 0; i < MAX_LOOPS; ++i )
+ {
List<LeaderLatch> leaders = getLeaders(latches);
- if (leaders.size() != 0) {
+ if ( leaders.size() != 0 )
+ {
return leaders;
}
timing.sleepABit();
@@ -621,10 +679,13 @@ public class TestLeaderLatch extends BaseClassForTests {
return Lists.newArrayList();
}
- private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches) {
+ private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)
+ {
List<LeaderLatch> leaders = Lists.newArrayList();
- for (LeaderLatch latch : latches) {
- if (latch.hasLeadership()) {
+ for ( LeaderLatch latch : latches )
+ {
+ if ( latch.hasLeadership() )
+ {
leaders.add(latch);
}
}
[12/18] git commit: Utility to get a name for the thread
Posted by ra...@apache.org.
Utility to get a name for the thread
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1c94c7ef
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1c94c7ef
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1c94c7ef
Branch: refs/heads/master
Commit: 1c94c7efa3b5256e14862495055805b4612ca4f4
Parents: 5907677
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:07:24 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:07:24 2014 -0500
----------------------------------------------------------------------
.../org/apache/curator/utils/ThreadUtils.java | 9 +++
.../recipes/AfterConnectionEstablished.java | 73 ++++++++++++++++++++
.../ExecuteAfterConnectionEstablished.java | 73 --------------------
3 files changed, 82 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
index f238124..9665dfe 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
@@ -53,4 +53,13 @@ public class ThreadUtils
.setDaemon(true)
.build();
}
+
+ public static String getProcessName(Class<?> clazz)
+ {
+ if ( clazz.isAnonymousClass() )
+ {
+ return getProcessName(clazz.getEnclosingClass());
+ }
+ return clazz.getSimpleName();
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
new file mode 100644
index 0000000..f37f7c0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
+ */
+public class AfterConnectionEstablished
+{
+ private final static Logger log = LoggerFactory.getLogger(AfterConnectionEstablished.class);
+
+ /**
+ * Spawns a new new background thread that will block until a connection is available and
+ * then execute the 'runAfterConnection' logic
+ *
+ * @param client The curator client
+ * @param runAfterConnection The logic to run
+ */
+ public static <T> T execute(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+ {
+ //Block until connected
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
+ Callable<T> internalCall = new Callable<T>()
+ {
+ @Override
+ public T call() throws Exception
+ {
+ try
+ {
+ client.blockUntilConnected();
+ return runAfterConnection.call();
+ }
+ catch ( Exception e )
+ {
+ log.error("An error occurred blocking until a connection is available", e);
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+ };
+ return executor.submit(internalCall).get();
+ }
+
+ private AfterConnectionEstablished()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
deleted file mode 100644
index 408ed03..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
- */
-public class ExecuteAfterConnectionEstablished
-{
- private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
-
- /**
- * Spawns a new new background thread that will block until a connection is available and
- * then execute the 'runAfterConnection' logic
- *
- * @param client The curator client
- * @param runAfterConnection The logic to run
- */
- public static <T> T executeAfterConnectionEstablishedInBackground(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
- {
- //Block until connected
- final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
- Callable<T> internalCall = new Callable<T>()
- {
- @Override
- public T call() throws Exception
- {
- try
- {
- client.blockUntilConnected();
- return runAfterConnection.call();
- }
- catch ( Exception e )
- {
- log.error("An error occurred blocking until a connection is available", e);
- throw e;
- }
- finally
- {
- executor.shutdown();
- }
- }
- };
- return executor.submit(internalCall).get();
- }
-
- private ExecuteAfterConnectionEstablished()
- {
- }
-}
[17/18] git commit: Added testProperCloseWithoutConnectionEstablished
Posted by ra...@apache.org.
Added testProperCloseWithoutConnectionEstablished
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bac23d3a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bac23d3a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bac23d3a
Branch: refs/heads/master
Commit: bac23d3af7abf4bf25d5acfca7c5f2cb53b739d0
Parents: 03e736c
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 17:11:53 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 17:11:53 2014 -0500
----------------------------------------------------------------------
.../framework/recipes/leader/LeaderLatch.java | 18 +++++--
.../recipes/leader/TestLeaderLatch.java | 49 ++++++++++++++++++++
2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/bac23d3a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 6f7636a..af3f895 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -200,11 +200,7 @@ public class LeaderLatch implements Closeable
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
- Future<?> localStartTask = startTask.getAndSet(null);
- if ( localStartTask != null )
- {
- localStartTask.cancel(true);
- }
+ cancelStartTask();
try
{
@@ -237,6 +233,18 @@ public class LeaderLatch implements Closeable
}
}
+ @VisibleForTesting
+ protected boolean cancelStartTask()
+ {
+ Future<?> localStartTask = startTask.getAndSet(null);
+ if ( localStartTask != null )
+ {
+ localStartTask.cancel(true);
+ return true;
+ }
+ return false;
+ }
+
/**
* Attaches a listener to this LeaderLatch
* <p/>
http://git-wip-us.apache.org/repos/asf/curator/blob/bac23d3a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4fb1c7..fecb9e3 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -52,6 +52,55 @@ public class TestLeaderLatch extends BaseClassForTests
private static final int MAX_LOOPS = 5;
@Test
+ public void testProperCloseWithoutConnectionEstablished() throws Exception
+ {
+ server.stop();
+
+ Timing timing = new Timing();
+ LeaderLatch latch = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final AtomicBoolean resetCalled = new AtomicBoolean(false);
+ final CountDownLatch cancelStartTaskLatch = new CountDownLatch(1);
+ latch = new LeaderLatch(client, PATH_NAME)
+ {
+ @Override
+ void reset() throws Exception
+ {
+ resetCalled.set(true);
+ super.reset();
+ }
+
+ @Override
+ protected boolean cancelStartTask()
+ {
+ if ( super.cancelStartTask() )
+ {
+ cancelStartTaskLatch.countDown();
+ return true;
+ }
+ return false;
+ }
+ };
+
+ latch.start();
+ latch.close();
+ latch = null;
+
+ Assert.assertTrue(timing.awaitLatch(cancelStartTaskLatch));
+ Assert.assertFalse(resetCalled.get());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(latch);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testResetRace() throws Exception
{
Timing timing = new Timing();
[06/18] git commit: some doc edits
Posted by ra...@apache.org.
some doc edits
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a57be393
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a57be393
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a57be393
Branch: refs/heads/master
Commit: a57be393bab39fcc20b09331cc2a736127885823
Parents: 122476a
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 9 11:10:16 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 9 11:10:16 2014 -0500
----------------------------------------------------------------------
.../src/site/confluence/configuration.confluence | 2 +-
curator-x-rpc/src/site/confluence/reference.confluence | 6 +++---
curator-x-rpc/src/site/confluence/usage.confluence | 13 +++++++------
3 files changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/a57be393/curator-x-rpc/src/site/confluence/configuration.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/configuration.confluence b/curator-x-rpc/src/site/confluence/configuration.confluence
index 939ee47..56f6cfe 100644
--- a/curator-x-rpc/src/site/confluence/configuration.confluence
+++ b/curator-x-rpc/src/site/confluence/configuration.confluence
@@ -120,7 +120,7 @@ h2. Connection
h2. Retry
-The retry policy configuration depends on what type is used. There 3 types supported:
+The retry policy configuration depends on what type is used. There are three types supported:
||Name||Type||Default Value||Description||
|type|string|n/a|*exponential\-backoff*|
http://git-wip-us.apache.org/repos/asf/curator/blob/a57be393/curator-x-rpc/src/site/confluence/reference.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/reference.confluence b/curator-x-rpc/src/site/confluence/reference.confluence
index a7b1ce5..68c3692 100644
--- a/curator-x-rpc/src/site/confluence/reference.confluence
+++ b/curator-x-rpc/src/site/confluence/reference.confluence
@@ -6,16 +6,16 @@ h2. CuratorService
||API||Arguments||Return Value||Description||
|newCuratorProjection|connectionName|CuratorProjection|Allocates a projection to a configured CuratorFramework instance in the RPC server. "connectionName" is the name of a [[configured|configuration.html]] connection.|
-|closeCuratorProjection|CuratorProjection|void|Close a CuratorProjection. Also closes any recipes, etc. create for the projection.|
+|closeCuratorProjection|CuratorProjection|void|Close a CuratorProjection. Also closes any recipes, etc. created for the projection.|
|pingCuratorProjection|CuratorProjection|void|Keeps the CuratorProjection from timing out. NOTE: your [[EventService|events.html]] event loop will do this for you.|
|createNode|CreateSpec|Created path name|Create a ZNode|
|deleteNode|DeleteSpec|void|Delete a ZNode|
|getData|GetDataSpec|bytes|Return a ZNode's data|
|setData|SetDataSpec|Stat|Set a ZNode's data|
-|exists|ExistsSpec|Stat|Check is a ZNode exists|
+|exists|ExistsSpec|Stat|Check if a ZNode exists|
|getChildren|GetChildrenSpec|List of nodes|Get the child nodes for a ZNode|
|sync|path and async context|void|Do a ZooKeeper sync|
-|closeGenericProjection|id|void|Closes any projection|
+|closeGenericProjection|id|void|Closes any projection. All projections have an "id" field. This is the value to pass.|
|acquireLock|path, maxWaitMs|optional lock projection|Acquire a lock for the given path. Will wait at most maxWaitMs to acquire the lock. If the acquisition fails, result will be null.|
|startLeaderSelector|path, participantId, waitForLeadershipMs|LeaderResult|Start a leader selector on the given path. The instance will be assigned the specified participantId. If waitForLeadershipMs is non\-zero, the method will block for that amount of time waiting for leadership.|
|getLeaderParticipants|leaderProjection|List of Participant|Return the participants in a leader selector|
http://git-wip-us.apache.org/repos/asf/curator/blob/a57be393/curator-x-rpc/src/site/confluence/usage.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/usage.confluence b/curator-x-rpc/src/site/confluence/usage.confluence
index 36fb04e..047ae5e 100644
--- a/curator-x-rpc/src/site/confluence/usage.confluence
+++ b/curator-x-rpc/src/site/confluence/usage.confluence
@@ -72,15 +72,16 @@ h2. Usage
Once initialized, use recipes/APIs as needed. Here is an example of using the lock recipe:
{code}
-optionalLockId = client.acquireLock(curatorProjection, "/mylock", 10000)
-if optionalLockId.lockProjection == null {
+optionalLock = client.acquireLock(curatorProjection, "/mylock", 10000)
+if optionalLock.lockProjection == null {
// lock attempt failed. Throw exception, etc.
}
-lockId = optionalLockId.lockProjection
+lockProjection = optionalLock.lockProjection
-// you now own the lock
-
-client.closeGenericProjection(curatorProjection, lockId.id)
+try
+ // you now own the lock
+finally
+ client.closeGenericProjection(curatorProjection, lockProjection.id)
{code}
Here is an example of using the path cache:
[15/18] git commit: AfterConnectionEstablished needs to return the
future from the service so that clients can cancel the action if needed.
Added this to LeaderLatch
Posted by ra...@apache.org.
AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5954e66f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5954e66f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5954e66f
Branch: refs/heads/master
Commit: 5954e66fa3108c39b3f2915583def5e51915846f
Parents: fab79a4
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 14:38:14 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 14:38:14 2014 -0500
----------------------------------------------------------------------
.../recipes/AfterConnectionEstablished.java | 7 ++-
.../framework/recipes/leader/LeaderLatch.java | 51 ++++++++++++++------
.../recipes/leader/TestLeaderLatch.java | 2 +
3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index 41ba702..65c6ace 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes;
import org.apache.curator.framework.CuratorFramework;
@@ -23,6 +24,7 @@ import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
/**
* Utility class to allow execution of logic once a ZooKeeper connection becomes available.
@@ -37,8 +39,9 @@ public class AfterConnectionEstablished
*
* @param client The curator client
* @param runAfterConnection The logic to run
+ * @return future of the task so it can be canceled, etc. if needed
*/
- public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
+ public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
{
//Block until connected
final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
@@ -62,7 +65,7 @@ public class AfterConnectionEstablished
}
}
};
- executor.submit(internalCall);
+ return executor.submit(internalCall);
}
private AfterConnectionEstablished()
http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 9d70645..6f7636a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -46,6 +46,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +70,7 @@ public class LeaderLatch implements Closeable
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;
+ private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@@ -155,22 +157,21 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- AfterConnectionEstablished.execute(client, new Runnable()
- {
- @Override
- public void run()
+ startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
- client.getConnectionStateListenable().addListener(listener);
- try
- {
- reset();
- }
- catch ( Exception e )
+ @Override
+ public void run()
{
- log.error("An error occurred checking resetting leadership.", e);
+ try
+ {
+ internalStart();
+ }
+ finally
+ {
+ startTask.set(null);
+ }
}
- }
- });
+ }));
}
/**
@@ -194,11 +195,17 @@ public class LeaderLatch implements Closeable
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
* @throws IOException errors
*/
- public void close(CloseMode closeMode) throws IOException
+ public synchronized void close(CloseMode closeMode) throws IOException
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
+ Future<?> localStartTask = startTask.getAndSet(null);
+ if ( localStartTask != null )
+ {
+ localStartTask.cancel(true);
+ }
+
try
{
setNode(null);
@@ -485,6 +492,22 @@ public class LeaderLatch implements Closeable
client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
+ private synchronized void internalStart()
+ {
+ if ( state.get() == State.STARTED )
+ {
+ client.getConnectionStateListenable().addListener(listener);
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("An error occurred checking resetting leadership.", e);
+ }
+ }
+ }
+
private void checkLeadership(List<String> children) throws Exception
{
final String localOurPath = ourPath.get();
http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b97e708..f4fb1c7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -89,6 +89,8 @@ public class TestLeaderLatch extends BaseClassForTests
try
{
client.start();
+ client.create().creatingParentsIfNeeded().forPath(PATH_NAME);
+
LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
latch.debugResetWaitLatch = new CountDownLatch(1);