You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:05 UTC

[01/18] git commit: CURATOR-110 - Modified state handling to treat 'CONNECTED' and 'RECONNECTED' events in the same way. Added a test case for the leader latch being started before a connection to ZK has been established.

Repository: curator
Updated Branches:
  refs/heads/master 6e98562d8 -> 5d7d0c7f1


CURATOR-110 - Modified state handling to treat 'CONNECTED' and
'RECONNECTED' events in the same way. Added a test case for the leader
latch being started before a connection to ZK has been established.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a63a102
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a63a102
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a63a102

Branch: refs/heads/master
Commit: 1a63a102ebbaaead265babd71a3d52928f848bd0
Parents: de1d38c
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Mon Jun 2 16:30:26 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Mon Jun 2 16:30:26 2014 +1000

----------------------------------------------------------------------
 .../framework/state/ConnectionState.java        |  41 +-
 .../framework/recipes/leader/LeaderLatch.java   | 633 +++++++++----------
 .../recipes/leader/TestLeaderLatch.java         | 516 +++++++--------
 3 files changed, 601 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 566d355..b25ce38 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -23,18 +23,17 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 /**
  * Represents state changes in the connection to ZK
  */
-public enum ConnectionState
-{
+public enum ConnectionState {
     /**
-     * Sent for the first successful connection to the server. NOTE: You will only
-     * get one of these messages for any CuratorFramework instance.
+     * Sent for the first successful connection to the server. NOTE: You will
+     * only get one of these messages for any CuratorFramework instance.
      */
     CONNECTED,
 
     /**
      * There has been a loss of connection. Leaders, locks, etc. should suspend
-     * until the connection is re-established. If the connection times-out you will
-     * receive a {@link #LOST} notice
+     * until the connection is re-established. If the connection times-out you
+     * will receive a {@link #LOST} notice
      */
     SUSPENDED,
 
@@ -44,18 +43,30 @@ public enum ConnectionState
     RECONNECTED,
 
     /**
-     * The connection is confirmed to be lost. Close any locks, leaders, etc. and
-     * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
-     * state after this but you should still consider any locks, etc. as dirty/unstable
+     * The connection is confirmed to be lost. Close any locks, leaders, etc.
+     * and attempt to re-create them. NOTE: it is possible to get a
+     * {@link #RECONNECTED} state after this but you should still consider any
+     * locks, etc. as dirty/unstable
      */
     LOST,
 
     /**
-     * The connection has gone into read-only mode. This can only happen if you pass true
-     * for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}. See the ZooKeeper doc
-     * regarding read only connections:
-     * <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
-     * The connection will remain in read only mode until another state change is sent.
+     * The connection has gone into read-only mode. This can only happen if you
+     * pass true for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}.
+     * See the ZooKeeper doc regarding read only connections: <a
+     * href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode"
+     * >http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>. The
+     * connection will remain in read only mode until another state change is
+     * sent.
      */
-    READ_ONLY
+    READ_ONLY;
+
+    /**
+     * Check if this state indicates that Curator has a connection to ZooKeeper
+     * 
+     * @return True if connected, false otherwise
+     */
+    public boolean isConnected() {
+        return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 88456af..d09ed1b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -19,9 +19,17 @@
 
 package org.apache.curator.framework.recipes.leader;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -38,71 +46,60 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 
 /**
  * <p>
- * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
- * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
- * randomly be assigned leader until it releases leadership at which time another one from the
- * group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of
+ * JMVs connected to a Zookeeper cluster. If a group of N thread/processes
+ * contend for leadership one will randomly be assigned leader until it releases
+ * leadership at which time another one from the group will randomly be chosen
  * </p>
  */
-public class LeaderLatch implements Closeable
-{
+public class LeaderLatch implements Closeable {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final String latchPath;
     private final String id;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicReference<State> state = new AtomicReference<State>(
+            State.LATENT);
     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
 
-    private final ConnectionStateListener listener = new ConnectionStateListener()
-    {
+    private final ConnectionStateListener listener = new ConnectionStateListener() {
         @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
+        public void stateChanged(CuratorFramework client,
+                ConnectionState newState) {
             handleStateChange(newState);
         }
     };
 
     private static final String LOCK_NAME = "latch-";
 
-    private static final LockInternalsSorter sorter = new LockInternalsSorter()
-    {
+    private static final LockInternalsSorter sorter = new LockInternalsSorter() {
         @Override
-        public String fixForSorting(String str, String lockName)
-        {
-            return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
+        public String fixForSorting(String str, String lockName) {
+            return StandardLockInternalsDriver.standardFixForSorting(str,
+                    lockName);
         }
     };
 
-    public enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
+    public enum State {
+        LATENT, STARTED, CLOSED
     }
 
     /**
      * How to handle listeners when the latch is closed
      */
-    public enum CloseMode
-    {
+    public enum CloseMode {
         /**
-         * When the latch is closed, listeners will *not* be notified (default behavior)
+         * When the latch is closed, listeners will *not* be notified (default
+         * behavior)
          */
         SILENT,
 
@@ -113,105 +110,116 @@ public class LeaderLatch implements Closeable
     }
 
     /**
-     * @param client    the client
-     * @param latchPath the path for this leadership group
+     * @param client
+     *            the client
+     * @param latchPath
+     *            the path for this leadership group
      */
-    public LeaderLatch(CuratorFramework client, String latchPath)
-    {
+    public LeaderLatch(CuratorFramework client, String latchPath) {
         this(client, latchPath, "", CloseMode.SILENT);
     }
 
     /**
-     * @param client    the client
-     * @param latchPath the path for this leadership group
-     * @param id        participant ID
+     * @param client
+     *            the client
+     * @param latchPath
+     *            the path for this leadership group
+     * @param id
+     *            participant ID
      */
-    public LeaderLatch(CuratorFramework client, String latchPath, String id)
-    {
+    public LeaderLatch(CuratorFramework client, String latchPath, String id) {
         this(client, latchPath, id, CloseMode.SILENT);
     }
 
     /**
-     * @param client    the client
-     * @param latchPath the path for this leadership group
-     * @param id        participant ID
-     * @param closeMode behaviour of listener on explicit close.
+     * @param client
+     *            the client
+     * @param latchPath
+     *            the path for this leadership group
+     * @param id
+     *            participant ID
+     * @param closeMode
+     *            behaviour of listener on explicit close.
      */
-    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
-    {
-        this.client = Preconditions.checkNotNull(client, "client cannot be null");
-        this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
+    public LeaderLatch(CuratorFramework client, String latchPath, String id,
+            CloseMode closeMode) {
+        this.client = Preconditions.checkNotNull(client,
+                "client cannot be null");
+        this.latchPath = Preconditions.checkNotNull(latchPath,
+                "mutexPath cannot be null");
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
-        this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
+        this.closeMode = Preconditions.checkNotNull(closeMode,
+                "closeMode cannot be null");
     }
 
     /**
-     * Add this instance to the leadership election and attempt to acquire leadership.
-     *
-     * @throws Exception errors
+     * Add this instance to the leadership election and attempt to acquire
+     * leadership.
+     * 
+     * @throws Exception
+     *             errors
      */
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+    public void start() throws Exception {
+        Preconditions.checkState(
+                state.compareAndSet(State.LATENT, State.STARTED),
+                "Cannot be started more than once");
 
         client.getConnectionStateListenable().addListener(listener);
         reset();
     }
 
     /**
-     * Remove this instance from the leadership election. If this instance is the leader, leadership
-     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
-     * instances must eventually be closed.
-     *
-     * @throws IOException errors
+     * Remove this instance from the leadership election. If this instance is
+     * the leader, leadership is released. IMPORTANT: the only way to release
+     * leadership is by calling close(). All LeaderLatch instances must
+     * eventually be closed.
+     * 
+     * @throws IOException
+     *             errors
      */
     @Override
-    public void close() throws IOException
-    {
+    public void close() throws IOException {
         close(closeMode);
     }
 
     /**
-     * Remove this instance from the leadership election. If this instance is the leader, leadership
-     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
-     * instances must eventually be closed.
-     *
-     * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
-     *
-     * @throws IOException errors
+     * Remove this instance from the leadership election. If this instance is
+     * the leader, leadership is released. IMPORTANT: the only way to release
+     * leadership is by calling close(). All LeaderLatch instances must
+     * eventually be closed.
+     * 
+     * @param closeMode
+     *            allows the default close mode to be overridden at the time the
+     *            latch is closed.
+     * 
+     * @throws IOException
+     *             errors
      */
-    public void close(CloseMode closeMode) throws IOException
-    {
-        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+    public void close(CloseMode closeMode) throws IOException {
+        Preconditions.checkState(
+                state.compareAndSet(State.STARTED, State.CLOSED),
+                "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
-        try
-        {
+        try {
             setNode(null);
-        }
-        catch ( Exception e )
-        {
+        } catch (Exception e) {
             throw new IOException(e);
-        }
-        finally
-        {
+        } finally {
             client.getConnectionStateListenable().removeListener(listener);
 
-            switch ( closeMode )
-            {
-                case NOTIFY_LEADER:
-                {
-                    setLeadership(false);
-                    listeners.clear();
-                    break;
-                }
+            switch (closeMode) {
+            case NOTIFY_LEADER: {
+                setLeadership(false);
+                listeners.clear();
+                break;
+            }
 
-                default:
-                {
-                    listeners.clear();
-                    setLeadership(false);
-                    break;
-                }
+            default: {
+                listeners.clear();
+                setLeadership(false);
+                break;
+            }
             }
         }
     }
@@ -219,134 +227,160 @@ public class LeaderLatch implements Closeable
     /**
      * Attaches a listener to this LeaderLatch
      * <p>
-     * Attaching the same listener multiple times is a noop from the second time on.
+     * Attaching the same listener multiple times is a noop from the second time
+     * on.
      * <p>
-     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
-     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+     * All methods for the listener are run using the provided Executor. It is
+     * common to pass in a single-threaded executor so that you can be certain
+     * that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
-     *
-     * @param listener the listener to attach
+     * 
+     * @param listener
+     *            the listener to attach
      */
-    public void addListener(LeaderLatchListener listener)
-    {
+    public void addListener(LeaderLatchListener listener) {
         listeners.addListener(listener);
     }
 
     /**
      * Attaches a listener to this LeaderLatch
      * <p>
-     * Attaching the same listener multiple times is a noop from the second time on.
+     * Attaching the same listener multiple times is a noop from the second time
+     * on.
      * <p>
-     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
-     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+     * All methods for the listener are run using the provided Executor. It is
+     * common to pass in a single-threaded executor so that you can be certain
+     * that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
-     *
-     * @param listener the listener to attach
-     * @param executor     An executor to run the methods for the listener on.
+     * 
+     * @param listener
+     *            the listener to attach
+     * @param executor
+     *            An executor to run the methods for the listener on.
      */
-    public void addListener(LeaderLatchListener listener, Executor executor)
-    {
+    public void addListener(LeaderLatchListener listener, Executor executor) {
         listeners.addListener(listener, executor);
     }
 
     /**
      * Removes a given listener from this LeaderLatch
-     *
-     * @param listener the listener to remove
+     * 
+     * @param listener
+     *            the listener to remove
      */
-    public void removeListener(LeaderLatchListener listener)
-    {
+    public void removeListener(LeaderLatchListener listener) {
         listeners.removeListener(listener);
     }
 
     /**
-     * <p>Causes the current thread to wait until this instance acquires leadership
-     * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
-     * <p>If this instance already is the leader then this method returns immediately.</p>
-     *
-     * <p>Otherwise the current
-     * thread becomes disabled for thread scheduling purposes and lies
-     * dormant until one of three things happen:</p>
+     * <p>
+     * Causes the current thread to wait until this instance acquires leadership
+     * unless the thread is {@linkplain Thread#interrupt interrupted} or
+     * {@linkplain #close() closed}.
+     * </p>
+     * <p>
+     * If this instance already is the leader then this method returns
+     * immediately.
+     * </p>
+     * 
+     * <p>
+     * Otherwise the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of three things happen:
+     * </p>
      * <ul>
      * <li>This instance becomes the leader</li>
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread</li>
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     * <p>If the current thread:</p>
+     * <p>
+     * If the current thread:
+     * </p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
      * </ul>
-     * <p>then {@link InterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.</p>
-     *
-     * @throws InterruptedException if the current thread is interrupted
-     *                              while waiting
-     * @throws EOFException         if the instance is {@linkplain #close() closed}
-     *                              while waiting
+     * <p>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * </p>
+     * 
+     * @throws InterruptedException
+     *             if the current thread is interrupted while waiting
+     * @throws EOFException
+     *             if the instance is {@linkplain #close() closed} while waiting
      */
-    public void await() throws InterruptedException, EOFException
-    {
-        synchronized(this)
-        {
-            while ( (state.get() == State.STARTED) && !hasLeadership.get() )
-            {
+    public void await() throws InterruptedException, EOFException {
+        synchronized (this) {
+            while ((state.get() == State.STARTED) && !hasLeadership.get()) {
                 wait();
             }
         }
-        if ( state.get() != State.STARTED )
-        {
+        if (state.get() != State.STARTED) {
             throw new EOFException();
         }
     }
 
     /**
-     * <p>Causes the current thread to wait until this instance acquires leadership
-     * unless the thread is {@linkplain Thread#interrupt interrupted},
-     * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
-     *
-     * <p>If this instance already is the leader then this method returns immediately
-     * with the value {@code true}.</p>
-     *
-     * <p>Otherwise the current
-     * thread becomes disabled for thread scheduling purposes and lies
-     * dormant until one of four things happen:</p>
+     * <p>
+     * Causes the current thread to wait until this instance acquires leadership
+     * unless the thread is {@linkplain Thread#interrupt interrupted}, the
+     * specified waiting time elapses or the instance is {@linkplain #close()
+     * closed}.
+     * </p>
+     * 
+     * <p>
+     * If this instance already is the leader then this method returns
+     * immediately with the value {@code true}.
+     * </p>
+     * 
+     * <p>
+     * Otherwise the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of four things happen:
+     * </p>
      * <ul>
      * <li>This instance becomes the leader</li>
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread</li>
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread</li>
      * <li>The specified waiting time elapses.</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     *
-     * <p>If the current thread:</p>
+     * 
+     * <p>
+     * If the current thread:
+     * </p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
      * </ul>
-     * <p>then {@link InterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.</p>
-     *
-     * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
-     * then the value {@code false} is returned.  If the time is less than or equal to zero, the method
-     * will not wait at all.</p>
-     *
-     * @param timeout the maximum time to wait
-     * @param unit    the time unit of the {@code timeout} argument
-     * @return {@code true} if the count reached zero and {@code false}
-     *         if the waiting time elapsed before the count reached zero or the instances was closed
-     * @throws InterruptedException if the current thread is interrupted
-     *                              while waiting
+     * <p>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * </p>
+     * 
+     * <p>
+     * If the specified waiting time elapses or the instance is
+     * {@linkplain #close() closed} then the value {@code false} is returned. If
+     * the time is less than or equal to zero, the method will not wait at all.
+     * </p>
+     * 
+     * @param timeout
+     *            the maximum time to wait
+     * @param unit
+     *            the time unit of the {@code timeout} argument
+     * @return {@code true} if the count reached zero and {@code false} if the
+     *         waiting time elapsed before the count reached zero or the
+     *         instances was closed
+     * @throws InterruptedException
+     *             if the current thread is interrupted while waiting
      */
-    public boolean await(long timeout, TimeUnit unit) throws InterruptedException
-    {
+    public boolean await(long timeout, TimeUnit unit)
+            throws InterruptedException {
         long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
 
-        synchronized(this)
-        {
-            while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
-            {
+        synchronized (this) {
+            while ((waitNanos > 0) && (state.get() == State.STARTED)
+                    && !hasLeadership.get()) {
                 long startNanos = System.nanoTime();
                 TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
                 long elapsed = System.nanoTime() - startNanos;
@@ -358,24 +392,23 @@ public class LeaderLatch implements Closeable
 
     /**
      * Return this instance's participant Id
-     *
+     * 
      * @return participant Id
      */
-    public String getId()
-    {
+    public String getId() {
         return id;
     }
 
     /**
-     * Returns this instances current state, this is the only way to verify that the object has been closed before
-     * closing again.  If you try to close a latch multiple times, the close() method will throw an
-     * IllegalArgumentException which is often not caught and ignored (CloseableUtils.closeQuietly() only looks for
-     * IOException).
-     *
+     * Returns this instances current state, this is the only way to verify that
+     * the object has been closed before closing again. If you try to close a
+     * latch multiple times, the close() method will throw an
+     * IllegalArgumentException which is often not caught and ignored
+     * (CloseableUtils.closeQuietly() only looks for IOException).
+     * 
      * @return the state of the current instance
      */
-    public State getState()
-    {
+    public State getState() {
         return state.get();
     }
 
@@ -386,16 +419,17 @@ public class LeaderLatch implements Closeable
      * <p>
      * <p>
      * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     * return a value that does not match {@link #hasLeadership()} as hasLeadership
-     * uses a local field of the class.
+     * return a value that does not match {@link #hasLeadership()} as
+     * hasLeadership uses a local field of the class.
      * </p>
-     *
+     * 
      * @return participants
-     * @throws Exception ZK errors, interruptions, etc.
+     * @throws Exception
+     *             ZK errors, interruptions, etc.
      */
-    public Collection<Participant> getParticipants() throws Exception
-    {
-        Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+    public Collection<Participant> getParticipants() throws Exception {
+        Collection<String> participantNodes = LockInternals
+                .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getParticipants(client, participantNodes);
     }
 
@@ -407,26 +441,26 @@ public class LeaderLatch implements Closeable
      * <p>
      * <p>
      * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     * return a value that does not match {@link #hasLeadership()} as hasLeadership
-     * uses a local field of the class.
+     * return a value that does not match {@link #hasLeadership()} as
+     * hasLeadership uses a local field of the class.
      * </p>
-     *
+     * 
      * @return leader
-     * @throws Exception ZK errors, interruptions, etc.
+     * @throws Exception
+     *             ZK errors, interruptions, etc.
      */
-    public Participant getLeader() throws Exception
-    {
-        Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+    public Participant getLeader() throws Exception {
+        Collection<String> participantNodes = LockInternals
+                .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getLeader(client, participantNodes);
     }
 
     /**
      * Return true if leadership is currently held by this instance
-     *
+     * 
      * @return true/false
      */
-    public boolean hasLeadership()
-    {
+    public boolean hasLeadership() {
         return (state.get() == State.STARTED) && hasLeadership.get();
     }
 
@@ -434,105 +468,95 @@ public class LeaderLatch implements Closeable
     volatile CountDownLatch debugResetWaitLatch = null;
 
     @VisibleForTesting
-    void reset() throws Exception
-    {
+    void reset() throws Exception {
         setLeadership(false);
         setNode(null);
 
-        BackgroundCallback callback = new BackgroundCallback()
-        {
+        BackgroundCallback callback = new BackgroundCallback() {
             @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-            {
-                if ( debugResetWaitLatch != null )
-                {
+            public void processResult(CuratorFramework client,
+                    CuratorEvent event) throws Exception {
+                if (debugResetWaitLatch != null) {
                     debugResetWaitLatch.await();
                     debugResetWaitLatch = null;
                 }
 
-                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-                {
+                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                     setNode(event.getName());
-                    if ( state.get() == State.CLOSED )
-                    {
+                    if (state.get() == State.CLOSED) {
                         setNode(null);
-                    }
-                    else
-                    {
+                    } else {
                         getChildren();
                     }
-                }
-                else
-                {
-                    log.error("getChildren() failed. rc = " + event.getResultCode());
+                } else {
+                    log.error("getChildren() failed. rc = "
+                            + event.getResultCode());
                 }
             }
         };
-        client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
+        client.create()
+                .creatingParentsIfNeeded()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(callback)
+                .forPath(ZKPaths.makePath(latchPath, LOCK_NAME),
+                        LeaderSelector.getIdBytes(id));
     }
 
-    private void checkLeadership(List<String> children) throws Exception
-    {
+    private void checkLeadership(List<String> children) throws Exception {
         final String localOurPath = ourPath.get();
-        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
-        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
-        if ( ourIndex < 0 )
-        {
+        List<String> sortedChildren = LockInternals.getSortedChildren(
+                LOCK_NAME, sorter, children);
+        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths
+                .getNodeFromPath(localOurPath)) : -1;
+        if (ourIndex < 0) {
             log.error("Can't find our node. Resetting. Index: " + ourIndex);
             reset();
-        }
-        else if ( ourIndex == 0 )
-        {
+        } else if (ourIndex == 0) {
             setLeadership(true);
-        }
-        else
-        {
+        } else {
             String watchPath = sortedChildren.get(ourIndex - 1);
-            Watcher watcher = new Watcher()
-            {
+            Watcher watcher = new Watcher() {
                 @Override
-                public void process(WatchedEvent event)
-                {
-                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
-                    {
-                        try
-                        {
+                public void process(WatchedEvent event) {
+                    if ((state.get() == State.STARTED)
+                            && (event.getType() == Event.EventType.NodeDeleted)
+                            && (localOurPath != null)) {
+                        try {
                             getChildren();
-                        }
-                        catch ( Exception ex )
-                        {
-                            log.error("An error occurred checking the leadership.", ex);
+                        } catch (Exception ex) {
+                            log.error(
+                                    "An error occurred checking the leadership.",
+                                    ex);
                         }
                     }
                 }
             };
 
-            BackgroundCallback callback = new BackgroundCallback()
-            {
+            BackgroundCallback callback = new BackgroundCallback() {
                 @Override
-                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-                {
-                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
-                    {
+                public void processResult(CuratorFramework client,
+                        CuratorEvent event) throws Exception {
+                    if (event.getResultCode() == KeeperException.Code.NONODE
+                            .intValue()) {
                         // previous node is gone - reset
                         reset();
                     }
                 }
             };
-            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
-            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
+            // use getData() instead of exists() to avoid leaving unneeded
+            // watchers which is a type of resource leak
+            client.getData().usingWatcher(watcher).inBackground(callback)
+                    .forPath(ZKPaths.makePath(latchPath, watchPath));
         }
     }
 
-    private void getChildren() throws Exception
-    {
-        BackgroundCallback callback = new BackgroundCallback()
-        {
+    private void getChildren() throws Exception {
+        BackgroundCallback callback = new BackgroundCallback() {
             @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-            {
-                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-                {
+            public void processResult(CuratorFramework client,
+                    CuratorEvent event) throws Exception {
+                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                     checkLeadership(event.getChildren());
                 }
             }
@@ -540,82 +564,47 @@ public class LeaderLatch implements Closeable
         client.getChildren().inBackground(callback).forPath(latchPath);
     }
 
-    private void handleStateChange(ConnectionState newState)
-    {
-        switch ( newState )
-        {
-        default:
-        {
-            // NOP
-            break;
-        }
-
-        case RECONNECTED:
-        {
-            try
-            {
+    private void handleStateChange(ConnectionState newState) {
+        if (newState.isConnected()) {
+            try {
                 reset();
-            }
-            catch ( Exception e )
-            {
+            } catch (Exception e) {
                 log.error("Could not reset leader latch", e);
                 setLeadership(false);
             }
-            break;
-        }
-
-        case SUSPENDED:
-        case LOST:
-        {
+        } else {
             setLeadership(false);
-            break;
-        }
         }
     }
 
-    private synchronized void setLeadership(boolean newValue)
-    {
+    private synchronized void setLeadership(boolean newValue) {
         boolean oldValue = hasLeadership.getAndSet(newValue);
 
-        if ( oldValue && !newValue )
-        { // Lost leadership, was true, now false
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener listener)
-                    {
-                        listener.notLeader();
-                        return null;
-                    }
+        if (oldValue && !newValue) { // Lost leadership, was true, now false
+            listeners.forEach(new Function<LeaderLatchListener, Void>() {
+                @Override
+                public Void apply(LeaderLatchListener listener) {
+                    listener.notLeader();
+                    return null;
                 }
-            );
-        }
-        else if ( !oldValue && newValue )
-        { // Gained leadership, was false, now true
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener input)
-                    {
-                        input.isLeader();
-                        return null;
-                    }
+            });
+        } else if (!oldValue && newValue) { // Gained leadership, was false, now
+                                            // true
+            listeners.forEach(new Function<LeaderLatchListener, Void>() {
+                @Override
+                public Void apply(LeaderLatchListener input) {
+                    input.isLeader();
+                    return null;
                 }
-            );
+            });
         }
 
         notifyAll();
     }
 
-    private void setNode(String newValue) throws Exception
-    {
+    private void setNode(String newValue) throws Exception {
         String oldPath = ourPath.getAndSet(newValue);
-        if ( oldPath != null )
-        {
+        if (oldPath != null) {
             client.delete().guaranteed().inBackground().forPath(oldPath);
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a63a102/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b827e15..108f118 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,9 +19,17 @@
 
 package org.apache.curator.framework.recipes.leader;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
@@ -33,36 +41,29 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
-public class TestLeaderLatch extends BaseClassForTests
-{
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TestLeaderLatch extends BaseClassForTests {
     private static final String PATH_NAME = "/one/two/me";
     private static final int MAX_LOOPS = 5;
 
     @Test
-    public void testResetRace() throws Exception
-    {
+    public void testResetRace() throws Exception {
         Timing timing = new Timing();
         LeaderLatch latch = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
             latch = new LeaderLatch(client, PATH_NAME);
 
             latch.debugResetWaitLatch = new CountDownLatch(1);
-            latch.start();  // will call reset()
-            latch.reset();  // should not result in two nodes
+            latch.start(); // will call reset()
+            latch.reset(); // should not result in two nodes
 
             timing.sleepABit();
 
@@ -70,22 +71,21 @@ public class TestLeaderLatch extends BaseClassForTests
 
             timing.sleepABit();
 
-            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
-        }
-        finally
-        {
+            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
+                    1);
+        } finally {
             CloseableUtils.closeQuietly(latch);
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testCreateDeleteRace() throws Exception
-    {
+    public void testCreateDeleteRace() throws Exception {
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
             LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
 
@@ -100,43 +100,40 @@ public class TestLeaderLatch extends BaseClassForTests
 
             timing.sleepABit();
 
-            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
+            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
+                    0);
 
-        }
-        finally
-        {
+        } finally {
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testLostConnection() throws Exception
-    {
+    public void testLostConnection() throws Exception {
         final int PARTICIPANT_QTY = 10;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
             final CountDownLatch countDownLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, ConnectionState newState)
-                {
-                    if ( newState == ConnectionState.LOST )
-                    {
-                        countDownLatch.countDown();
-                    }
-                }
-            });
+            client.getConnectionStateListenable().addListener(
+                    new ConnectionStateListener() {
+                        @Override
+                        public void stateChanged(CuratorFramework client,
+                                ConnectionState newState) {
+                            if (newState == ConnectionState.LOST) {
+                                countDownLatch.countDown();
+                            }
+                        }
+                    });
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
                 latch.start();
                 latches.add(latch);
@@ -151,13 +148,12 @@ public class TestLeaderLatch extends BaseClassForTests
 
             Assert.assertEquals(getLeaders(latches).size(), 0);
 
-            server = new TestingServer(server.getPort(), server.getTempDirectory());
-            Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
+            server = new TestingServer(server.getPort(),
+                    server.getTempDirectory());
+            Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should
+                                                                            // reconnect
+        } finally {
+            for (LeaderLatch latch : latches) {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
@@ -165,21 +161,20 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testCorrectWatching() throws Exception
-    {
+    public void testCorrectWatching() throws Exception {
         final int PARTICIPANT_QTY = 10;
         final int PARTICIPANT_ID = 2;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
                 latch.start();
                 latches.add(latch);
@@ -187,20 +182,20 @@ public class TestLeaderLatch extends BaseClassForTests
 
             waitForALeader(latches, timing);
 
-            //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
+            // we need to close a Participant that doesn't be actual leader
+            // (first Participant) nor the last
             latches.get(PARTICIPANT_ID).close();
 
-            //As the previous algorithm assumed that if the watched node is deleted gets the leadership
-            //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
+            // As the previous algorithm assumed that if the watched node is
+            // deleted gets the leadership
+            // we need to ensure that the PARTICIPANT_ID-1 is not getting
+            // (wrongly) elected as leader.
             Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
-        }
-        finally
-        {
-            //removes the already closed participant
+        } finally {
+            // removes the already closed participant
             latches.remove(PARTICIPANT_ID);
 
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
@@ -209,37 +204,35 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testWaiting() throws Exception
-    {
+    public void testWaiting() throws Exception {
         final int PARTICIPANT_QTY = 10;
 
-        ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
-        ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(executorService);
+        ExecutorService executorService = Executors
+                .newFixedThreadPool(PARTICIPANT_QTY);
+        ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(
+                executorService);
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
             final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
-                service.submit(new Callable<Void>()
-                {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+                service.submit(new Callable<Void>() {
                     @Override
-                    public Void call() throws Exception
-                    {
+                    public Void call() throws Exception {
                         LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                        try
-                        {
+                        try {
                             latch.start();
-                            Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
-                            Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
-                            Thread.sleep((int)(10 * Math.random()));
-                        }
-                        finally
-                        {
+                            Assert.assertTrue(latch.await(timing.forWaiting()
+                                    .seconds(), TimeUnit.SECONDS));
+                            Assert.assertTrue(thereIsALeader.compareAndSet(
+                                    false, true));
+                            Thread.sleep((int) (10 * Math.random()));
+                        } finally {
                             thereIsALeader.set(false);
                             latch.close();
                         }
@@ -248,68 +241,58 @@ public class TestLeaderLatch extends BaseClassForTests
                 });
             }
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 service.take().get();
             }
-        }
-        finally
-        {
+        } finally {
             executorService.shutdown();
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testBasic() throws Exception
-    {
+    public void testBasic() throws Exception {
         basic(Mode.START_IMMEDIATELY);
     }
 
     @Test
-    public void testBasicAlt() throws Exception
-    {
+    public void testBasicAlt() throws Exception {
         basic(Mode.START_IN_THREADS);
     }
 
     @Test
-    public void testCallbackSanity() throws Exception
-    {
+    public void testCallbackSanity() throws Exception {
         final int PARTICIPANT_QTY = 10;
         final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
         final AtomicLong masterCounter = new AtomicLong(0);
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors
+                .newSingleThreadExecutor(new ThreadFactoryBuilder()
+                        .setDaemon(true).setNameFormat("callbackSanity-%s")
+                        .build());
 
         List<LeaderLatch> latches = Lists.newArrayList();
-        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-        {
+        for (int i = 0; i < PARTICIPANT_QTY; ++i) {
             final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-            latch.addListener(new LeaderLatchListener()
-            {
+            latch.addListener(new LeaderLatchListener() {
                 boolean beenLeader = false;
 
                 @Override
-                public void isLeader()
-                {
-                    if ( !beenLeader )
-                    {
+                public void isLeader() {
+                    if (!beenLeader) {
                         masterCounter.incrementAndGet();
                         beenLeader = true;
-                        try
-                        {
+                        try {
                             latch.reset();
-                        }
-                        catch ( Exception e )
-                        {
+                        } catch (Exception e) {
                             throw Throwables.propagate(e);
                         }
-                    }
-                    else
-                    {
+                    } else {
                         masterCounter.incrementAndGet();
                         CloseableUtils.closeQuietly(latch);
                         timesSquare.countDown();
@@ -317,20 +300,17 @@ public class TestLeaderLatch extends BaseClassForTests
                 }
 
                 @Override
-                public void notLeader()
-                {
+                public void notLeader() {
                     notLeaderCounter.incrementAndGet();
                 }
             }, exec);
             latches.add(latch);
         }
 
-        try
-        {
+        try {
             client.start();
 
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 latch.start();
             }
 
@@ -338,17 +318,12 @@ public class TestLeaderLatch extends BaseClassForTests
 
             Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
             Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
             }
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
-                if ( latch.getState() != LeaderLatch.State.CLOSED )
-                {
+        } finally {
+            for (LeaderLatch latch : latches) {
+                if (latch.getState() != LeaderLatch.State.CLOSED) {
                     CloseableUtils.closeQuietly(latch);
                 }
             }
@@ -357,8 +332,7 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testCallbackNotifyLeader() throws Exception
-    {
+    public void testCallbackNotifyLeader() throws Exception {
         final int PARTICIPANT_QTY = 10;
         final int SILENT_QTY = 3;
 
@@ -367,37 +341,35 @@ public class TestLeaderLatch extends BaseClassForTests
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors
+                .newSingleThreadExecutor(new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("callbackNotifyLeader-%s").build());
 
         List<LeaderLatch> latches = Lists.newArrayList();
-        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-        {
-            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
+        for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT
+                    : LeaderLatch.CloseMode.NOTIFY_LEADER;
 
-            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
-            latch.addListener(new LeaderLatchListener()
-            {
+            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "",
+                    closeMode);
+            latch.addListener(new LeaderLatchListener() {
                 boolean beenLeader = false;
 
                 @Override
-                public void isLeader()
-                {
-                    if ( !beenLeader )
-                    {
+                public void isLeader() {
+                    if (!beenLeader) {
                         masterCounter.incrementAndGet();
                         beenLeader = true;
-                        try
-                        {
+                        try {
                             latch.reset();
-                        }
-                        catch ( Exception e )
-                        {
+                        } catch (Exception e) {
                             throw Throwables.propagate(e);
                         }
-                    }
-                    else
-                    {
+                    } else {
                         masterCounter.incrementAndGet();
                         CloseableUtils.closeQuietly(latch);
                         timesSquare.countDown();
@@ -405,38 +377,31 @@ public class TestLeaderLatch extends BaseClassForTests
                 }
 
                 @Override
-                public void notLeader()
-                {
+                public void notLeader() {
                     notLeaderCounter.incrementAndGet();
                 }
             }, exec);
             latches.add(latch);
         }
 
-        try
-        {
+        try {
             client.start();
 
-            for ( LeaderLatch latch : latches )
-            {
+            for (LeaderLatch latch : latches) {
                 latch.start();
             }
 
             timesSquare.await();
 
             Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
-            Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
-            for ( LeaderLatch latch : latches )
-            {
+            Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2
+                    - SILENT_QTY);
+            for (LeaderLatch latch : latches) {
                 Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
             }
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
-                if ( latch.getState() != LeaderLatch.State.CLOSED )
-                {
+        } finally {
+            for (LeaderLatch latch : latches) {
+                if (latch.getState() != LeaderLatch.State.CLOSED) {
                     CloseableUtils.closeQuietly(latch);
                 }
             }
@@ -445,47 +410,42 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
-    public void testCallbackDontNotify() throws Exception
-    {
+    public void testCallbackDontNotify() throws Exception {
         final AtomicLong masterCounter = new AtomicLong(0);
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
 
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
-        final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+        final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME,
+                "", LeaderLatch.CloseMode.NOTIFY_LEADER);
 
-        leader.addListener(new LeaderLatchListener()
-        {
+        leader.addListener(new LeaderLatchListener() {
             @Override
-            public void isLeader()
-            {
+            public void isLeader() {
             }
 
             @Override
-            public void notLeader()
-            {
+            public void notLeader() {
                 masterCounter.incrementAndGet();
             }
         });
 
-        notifiedLeader.addListener(new LeaderLatchListener()
-        {
+        notifiedLeader.addListener(new LeaderLatchListener() {
             @Override
-            public void isLeader()
-            {
+            public void isLeader() {
             }
 
             @Override
-            public void notLeader()
-            {
+            public void notLeader() {
                 notLeaderCounter.incrementAndGet();
             }
         });
 
-        try
-        {
+        try {
             client.start();
 
             leader.start();
@@ -504,63 +464,116 @@ public class TestLeaderLatch extends BaseClassForTests
             leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
 
             Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
-            Assert.assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
+            Assert.assertEquals(notifiedLeader.getState(),
+                    LeaderLatch.State.CLOSED);
 
             Assert.assertEquals(masterCounter.get(), 1);
             Assert.assertEquals(notLeaderCounter.get(), 0);
-        }
-        finally
-        {
-            if ( leader.getState() != LeaderLatch.State.CLOSED )
-            {
+        } finally {
+            if (leader.getState() != LeaderLatch.State.CLOSED) {
                 CloseableUtils.closeQuietly(leader);
             }
-            if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
-            {
+            if (notifiedLeader.getState() != LeaderLatch.State.CLOSED) {
                 CloseableUtils.closeQuietly(notifiedLeader);
             }
             CloseableUtils.closeQuietly(client);
         }
     }
 
-    private enum Mode
-    {
-        START_IMMEDIATELY,
-        START_IN_THREADS
+    @Test
+    public void testNoServerAtStart() {
+        CloseableUtils.closeQuietly(server);
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+
+        client.start();
+
+        final CountDownLatch sessionLostCount = new CountDownLatch(1);
+
+        // Need to ensure that we've actually lost the connection completely
+        // before starting. Otherwise it's possible that the server will start
+        // during retries of the inital commands to create the latch zNodes
+        client.getConnectionStateListenable().addListener(
+                new ConnectionStateListener() {
+
+                    @Override
+                    public void stateChanged(CuratorFramework client,
+                            ConnectionState newState) {
+                        if (newState == ConnectionState.LOST) {
+                            sessionLostCount.countDown();
+                        }
+                    }
+                });
+
+        final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
+        final CountDownLatch leaderCounter = new CountDownLatch(1);
+        leader.addListener(new LeaderLatchListener() {
+
+            @Override
+            public void isLeader() {
+                leaderCounter.countDown();
+            }
+
+            @Override
+            public void notLeader() {
+            }
+
+        });
+
+        try {
+            leader.start();
+
+            timing.awaitLatch(sessionLostCount);
+
+            // Start the new server
+            server = new TestingServer(server.getPort());
+
+            timing.awaitLatch(leaderCounter);
+
+        } catch (Exception e) {
+            Assert.fail("Unexpected exception", e);
+        } finally {
+            CloseableUtils.closeQuietly(leader);
+            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(server);
+        }
+
+    }
+
+    private enum Mode {
+        START_IMMEDIATELY, START_IN_THREADS
     }
 
-    private void basic(Mode mode) throws Exception
-    {
-        final int PARTICIPANT_QTY = 1;//0;
+    private void basic(Mode mode) throws Exception {
+        final int PARTICIPANT_QTY = 1;// 0;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(),
+                timing.connection(), new RetryOneTime(1));
+        try {
             client.start();
 
-            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-            {
+            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                if ( mode == Mode.START_IMMEDIATELY )
-                {
+                if (mode == Mode.START_IMMEDIATELY) {
                     latch.start();
                 }
                 latches.add(latch);
             }
-            if ( mode == Mode.START_IN_THREADS )
-            {
-                ExecutorService service = Executors.newFixedThreadPool(latches.size());
-                for ( final LeaderLatch latch : latches )
-                {
-                    service.submit(new Callable<Object>()
-                    {
+            if (mode == Mode.START_IN_THREADS) {
+                ExecutorService service = Executors.newFixedThreadPool(latches
+                        .size());
+                for (final LeaderLatch latch : latches) {
+                    service.submit(new Callable<Object>() {
                         @Override
-                        public Object call() throws Exception
-                        {
-                            Thread.sleep((int)(100 * Math.random()));
+                        public Object call() throws Exception {
+                            Thread.sleep((int) (100 * Math.random()));
                             latch.start();
                             return null;
                         }
@@ -569,36 +582,38 @@ public class TestLeaderLatch extends BaseClassForTests
                 service.shutdown();
             }
 
-            while ( latches.size() > 0 )
-            {
+            while (latches.size() > 0) {
                 List<LeaderLatch> leaders = waitForALeader(latches, timing);
-                Assert.assertEquals(leaders.size(), 1); // there can only be one leader
+                Assert.assertEquals(leaders.size(), 1); // there can only be one
+                                                        // leader
                 LeaderLatch theLeader = leaders.get(0);
-                if ( mode == Mode.START_IMMEDIATELY )
-                {
-                    Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
+                if (mode == Mode.START_IMMEDIATELY) {
+                    Assert.assertEquals(latches.indexOf(theLeader), 0); // assert
+                                                                        // ordering
+                                                                        // -
+                                                                        // leadership
+                                                                        // should
+                                                                        // advance
+                                                                        // in
+                                                                        // start
+                                                                        // order
                 }
                 theLeader.close();
                 latches.remove(theLeader);
             }
-        }
-        finally
-        {
-            for ( LeaderLatch latch : latches )
-            {
+        } finally {
+            for (LeaderLatch latch : latches) {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
         }
     }
 
-    private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
-    {
-        for ( int i = 0; i < MAX_LOOPS; ++i )
-        {
+    private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches,
+            Timing timing) throws InterruptedException {
+        for (int i = 0; i < MAX_LOOPS; ++i) {
             List<LeaderLatch> leaders = getLeaders(latches);
-            if ( leaders.size() != 0 )
-            {
+            if (leaders.size() != 0) {
                 return leaders;
             }
             timing.sleepABit();
@@ -606,13 +621,10 @@ public class TestLeaderLatch extends BaseClassForTests
         return Lists.newArrayList();
     }
 
-    private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)
-    {
+    private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches) {
         List<LeaderLatch> leaders = Lists.newArrayList();
-        for ( LeaderLatch latch : latches )
-        {
-            if ( latch.hasLeadership() )
-            {
+        for (LeaderLatch latch : latches) {
+            if (latch.hasLeadership()) {
                 leaders.add(latch);
             }
         }


[16/18] git commit: minor refactor

Posted by ra...@apache.org.
minor refactor


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/03e736c6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/03e736c6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/03e736c6

Branch: refs/heads/master
Commit: 03e736c66c353eb05ae7257553e53d8c2983ce40
Parents: 5954e66
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 17:04:00 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 17:04:00 2014 -0500

----------------------------------------------------------------------
 .../curator/framework/state/ConnectionStateManager.java      | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/03e736c6/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index fb312dc..2a0cdd1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -193,16 +193,16 @@ public class ConnectionStateManager implements Closeable
 
     public synchronized boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
     {
-        boolean hasMaxWait = (units != null);
         long startTime = System.currentTimeMillis();
 
+        boolean hasMaxWait = (units != null);
+        long maxWaitTimeMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0;
+
         while ( !isConnected() )
         {
-            long maxWaitTimeMS = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0;
-
             if ( hasMaxWait )
             {
-                long waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+                long waitTime = maxWaitTimeMs - (System.currentTimeMillis() - startTime);
                 if ( waitTime <= 0 )
                 {
                     return isConnected();


[03/18] git commit: CURATOR-110 - Modified the enum to have an abstract isConnected() method that can be overridden by each enum instance.

Posted by ra...@apache.org.
CURATOR-110 - Modified the enum to have an abstract isConnected() method
that can be overridden by each enum instance.

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2c376b9f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2c376b9f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2c376b9f

Branch: refs/heads/master
Commit: 2c376b9f51d6eab4340651abce3284f358bedbc7
Parents: 0b7ae7e
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Wed Jun 4 10:24:31 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Wed Jun 4 10:24:31 2014 +1000

----------------------------------------------------------------------
 .../framework/state/ConnectionState.java        | 45 ++++++++++++++++----
 1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/2c376b9f/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 7663e10..2e08b34 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -29,26 +29,50 @@ public enum ConnectionState
      * Sent for the first successful connection to the server. NOTE: You will only
      * get one of these messages for any CuratorFramework instance.
      */
-    CONNECTED,
+    CONNECTED
+    {
+        public boolean isConnected()
+        {
+            return true;
+        }
+    },
 
     /**
      * There has been a loss of connection. Leaders, locks, etc. should suspend
      * until the connection is re-established. If the connection times-out you will
      * receive a {@link #LOST} notice
      */
-    SUSPENDED,
+    SUSPENDED
+    {
+        public boolean isConnected()
+        {
+            return false;
+        }
+    },    
 
     /**
      * A suspended or read-only connection has been re-established
      */
-    RECONNECTED,
+    RECONNECTED
+    {
+        public boolean isConnected()
+        {
+            return true;
+        }
+    },
 
     /**
      * The connection is confirmed to be lost. Close any locks, leaders, etc. and
      * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
      * state after this but you should still consider any locks, etc. as dirty/unstable
      */
-    LOST,
+    LOST
+    {
+        public boolean isConnected()
+        {
+            return false;
+        }
+    },
 
     /**
      * The connection has gone into read-only mode. This can only happen if you pass true
@@ -57,15 +81,18 @@ public enum ConnectionState
      * <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
      * The connection will remain in read only mode until another state change is sent.
      */
-    READ_ONLY;
+    READ_ONLY
+    {
+        public boolean isConnected()
+        {
+            return true;
+        }
+    };
     
     /**
      * Check if this state indicates that Curator has a connection to ZooKeeper
      * 
      * @return True if connected, false otherwise
      */
-    public boolean isConnected()
-    {
-        return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
-    }
+    public abstract boolean isConnected();
 }


[09/18] git commit: CURATOR-110 - Moved the 'wait until connection established' logic into the ExecuteAfterConnectionEstablished utility class. Cleaned up the blockUntilConnected() logic in the CuratorFrameworkImpl

Posted by ra...@apache.org.
CURATOR-110 - Moved the 'wait until connection established' logic into
the ExecuteAfterConnectionEstablished utility class. Cleaned up the
blockUntilConnected() logic in the CuratorFrameworkImpl

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8138ed9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8138ed9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8138ed9

Branch: refs/heads/master
Commit: e8138ed9768e99e98e308845626e36aa55afadb0
Parents: 63d0401
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Mon Jun 16 15:59:56 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Mon Jun 16 15:59:56 2014 +1000

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  9 +---
 .../framework/imps/CuratorFrameworkImpl.java    | 22 +++-----
 .../framework/state/ConnectionStateManager.java |  5 ++
 .../ExecuteAfterConnectionEstablished.java      | 53 ++++++++++++++++++++
 .../framework/recipes/leader/LeaderLatch.java   | 44 ++++++----------
 5 files changed, 81 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 1df3fa5..13cff30 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -213,14 +213,7 @@ public interface CuratorFramework extends Closeable
      * @param watcher the watcher
      */
     public void clearWatcherReferences(Watcher watcher);
-    
-    /**
-     * Get the current connection state. The connection state will have a value of 0 until
-     * the first connection related event is received.
-     * @return The current connection state, or null if it is unknown 
-     */
-    public ConnectionState getCurrentConnectionState();
-    
+        
     /**
      * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
      * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 33e260d..d1de29f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -67,7 +67,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
     private final NamespaceImpl                                         namespace;
     private final ConnectionStateManager                                connectionStateManager;
-    private final AtomicReference<ConnectionState>						connectionState;
     private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
     private final byte[]                                                defaultData;
     private final FailedDeleteManager                                   failedDeleteManager;
@@ -75,6 +74,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ACLProvider                                           aclProvider;
     private final NamespaceFacadeCache                                  namespaceFacadeCache;
     private final NamespaceWatcherMap                                   namespaceWatcherMap = new NamespaceWatcherMap(this);
+    private final Object												connectionLock = new Object();
 
     private volatile ExecutorService                                    executorService;
     private final AtomicBoolean                                         logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -151,7 +151,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
-        connectionState = new AtomicReference<ConnectionState>(null);
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -174,10 +173,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
 			@Override
 			public void stateChanged(CuratorFramework client, ConnectionState newState)
 			{
-				connectionState.set(newState);
-				synchronized(connectionState)
+				synchronized(connectionLock)
 				{
-					connectionState.notifyAll();
+					connectionLock.notifyAll();
 				}
 			}
 		});
@@ -220,7 +218,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         threadFactory = parent.threadFactory;
         backgroundOperations = parent.backgroundOperations;
         connectionStateManager = parent.connectionStateManager;
-        connectionState = parent.connectionState;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
         compressionProvider = parent.compressionProvider;
@@ -890,16 +887,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
         );
     }
     
-    public ConnectionState getCurrentConnectionState()
-    {
-    	return connectionState.get();
-    }
-    
     @Override
     public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
     {
     	//Check if we're already connected
-    	ConnectionState currentConnectionState = connectionState.get();
+    	ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
     	if(currentConnectionState != null && currentConnectionState.isConnected())
     	{
     		return true;
@@ -910,9 +902,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
     	
     	for(;;)
     	{
-    		synchronized(connectionState)
+    		synchronized(connectionLock)
     		{
-    			currentConnectionState = connectionState.get();
+    			currentConnectionState = connectionStateManager.getCurrentConnectionState();
     	    	if(currentConnectionState != null && currentConnectionState.isConnected())
     			{
     				return true;
@@ -930,7 +922,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         			}    					
     			}
     		
-    			connectionState.wait(waitTime);
+    			connectionLock.wait(waitTime);
     		}
     	}
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 42804b8..ba29994 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -188,6 +188,11 @@ public class ConnectionStateManager implements Closeable
 
         return true;
     }
+    
+    public synchronized ConnectionState getCurrentConnectionState()
+    {
+    	return currentConnectionState;
+    }
 
     private void postState(ConnectionState state)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
new file mode 100644
index 0000000..d213d37
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
@@ -0,0 +1,53 @@
+package org.apache.curator.framework.recipes;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
+ *
+ */
+public class ExecuteAfterConnectionEstablished
+{
+	private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
+	
+	/**
+	 * Spawns a new new background thread that will block until a connection is available and
+	 * then execute the 'runAfterConnection' logic
+	 * @param name The name of the spawned thread
+	 * @param client The curator client
+	 * @param runAfterConnection The logic to run
+	 */
+	public static void executeAfterConnectionEstablishedInBackground(String name,
+																     final CuratorFramework client,
+																     final Runnable runAfterConnection)
+	{
+        //Block until connected
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(name);
+        executor.submit(new Runnable()
+        {
+			
+			@Override
+			public void run()
+			{
+				try
+				{
+		        	client.blockUntilConnected();			        
+			        runAfterConnection.run();
+				}
+				catch(Exception e)
+				{
+					log.error("An error occurred blocking until a connection is available", e);
+				}
+				finally
+				{
+					executor.shutdown();
+				}
+			}
+        });
+	}
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e8138ed9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 21e8cca..13a9f21 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -148,14 +149,6 @@ public class LeaderLatch implements Closeable
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
     }
-    
-    private CountDownLatch startLatch;
-    
-    public LeaderLatch(CuratorFramework client, String latchPath,
-    		CountDownLatch startLatch) {
-    	this(client, latchPath);
-        this.startLatch = startLatch;
-    }
 
     /**
      * Add this instance to the leadership election and attempt to acquire leadership.
@@ -166,30 +159,23 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        //Block until connected
-        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
-        executor.submit(new Runnable()
-        {
-			
-			@Override
+        ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(),
+        		client, new Runnable()
+        {					
+        	@Override
 			public void run()
 			{
-		        try
-		        {
-		        	client.blockUntilConnected();
-			        
-			        client.getConnectionStateListenable().addListener(listener);		        	
-		        	reset();
-		        }
-		        catch(Exception ex)
-		        {
-		        	log.error("An error occurred checking resetting leadership.", ex);
-		        } finally {
-		        	//Shutdown the executor
-		        	executor.shutdown();
-		        }
+        		try
+        		{
+        			client.getConnectionStateListenable().addListener(listener);		        	
+        			reset();
+        		}
+        		catch(Exception ex)
+                {
+                	log.error("An error occurred checking resetting leadership.", ex);
+                }
 			}
-        });
+		});     
     }
 
     /**


[05/18] git commit: give credit to Facebook Swift

Posted by ra...@apache.org.
give credit to Facebook Swift


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/122476ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/122476ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/122476ac

Branch: refs/heads/master
Commit: 122476ac582f2db9dab6d4fc6f599481fc98b4c9
Parents: 70c5254
Author: randgalt <ra...@apache.org>
Authored: Sat Jun 7 15:05:18 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jun 7 15:05:18 2014 -0500

----------------------------------------------------------------------
 curator-x-rpc/src/site/confluence/index.confluence | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/122476ac/curator-x-rpc/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/index.confluence b/curator-x-rpc/src/site/confluence/index.confluence
index b3686c3..70077ac 100644
--- a/curator-x-rpc/src/site/confluence/index.confluence
+++ b/curator-x-rpc/src/site/confluence/index.confluence
@@ -43,3 +43,7 @@ See [[Events|events.html]] for details on the Curator RPC event loop and its str
 h2. Reference
 
 See [[API Reference Page|reference.html]] for the API reference.
+
+----
+
+Special thanks to the [[Facebook Swift|https://github.com/facebook/swift/]] project which makes writing a Java Thrift server much easier.


[08/18] git commit: CURATOR-110 - Modified the CuratorFramework to expose a 'blockUntilConnected' method. This allows a race condition between the start() method and the ConnectionStateListener in the LeaderLatch recipe to be avoided. Updated the leader

Posted by ra...@apache.org.
CURATOR-110 - Modified the CuratorFramework to expose a 'blockUntilConnected' method. This allows a race condition between the start() method and the ConnectionStateListener in the LeaderLatch recipe to be avoided. Updated the leader latch start() method to block until a connection is available (in a background thread), before it attempts to setup its state. This means that it will work correctly if started before a connection to ZK is available.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/63d0401c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/63d0401c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/63d0401c

Branch: refs/heads/master
Commit: 63d0401c05f6cc122468f55c79c9e1bd4ddb3eb5
Parents: 2c376b9
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Thu Jun 12 14:09:45 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Thu Jun 12 14:09:45 2014 +1000

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  27 +++
 .../framework/imps/CuratorFrameworkImpl.java    |  69 ++++++
 .../framework/imps/TestBlockUntilConnected.java | 217 +++++++++++++++++++
 .../framework/recipes/leader/LeaderLatch.java   |  38 +++-
 .../recipes/leader/TestLeaderLatch.java         |  43 ++--
 5 files changed, 366 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 2d6e182..1df3fa5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -24,10 +24,13 @@ import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorTransaction;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
+
 import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Zookeeper framework-style client
@@ -210,4 +213,28 @@ public interface CuratorFramework extends Closeable
      * @param watcher the watcher
      */
     public void clearWatcherReferences(Watcher watcher);
+    
+    /**
+     * Get the current connection state. The connection state will have a value of 0 until
+     * the first connection related event is received.
+     * @return The current connection state, or null if it is unknown 
+     */
+    public ConnectionState getCurrentConnectionState();
+    
+    /**
+     * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
+     * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
+     * @param units The time units for the maximum wait time.
+     * @return True if connection has been established, false otherwise.
+     * @throws InterruptedException If interrupted while waiting
+     */
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
+    
+    /**
+     * Block until a connection to ZooKeeper is available. This method will not return until a
+     * connection is available or it is interrupted, in which case an InterruptedException will
+     * be thrown
+     * @throws InterruptedException If interrupted while waiting
+     */
+    public void blockUntilConnected() throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c4b1349..33e260d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.imps;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+
 import org.apache.curator.CuratorConnectionLossException;
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
@@ -43,6 +44,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -65,6 +67,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
     private final NamespaceImpl                                         namespace;
     private final ConnectionStateManager                                connectionStateManager;
+    private final AtomicReference<ConnectionState>						connectionState;
     private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
     private final byte[]                                                defaultData;
     private final FailedDeleteManager                                   failedDeleteManager;
@@ -148,6 +151,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
+        connectionState = new AtomicReference<ConnectionState>(null);
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -162,6 +166,21 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
         failedDeleteManager = new FailedDeleteManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
+        
+        //Add callback handler to determine connection state transitions
+    	getConnectionStateListenable().addListener(new ConnectionStateListener()
+    	{
+			
+			@Override
+			public void stateChanged(CuratorFramework client, ConnectionState newState)
+			{
+				connectionState.set(newState);
+				synchronized(connectionState)
+				{
+					connectionState.notifyAll();
+				}
+			}
+		});
     }
 
     private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -201,6 +220,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         threadFactory = parent.threadFactory;
         backgroundOperations = parent.backgroundOperations;
         connectionStateManager = parent.connectionStateManager;
+        connectionState = parent.connectionState;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
         compressionProvider = parent.compressionProvider;
@@ -869,4 +889,53 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
         );
     }
+    
+    public ConnectionState getCurrentConnectionState()
+    {
+    	return connectionState.get();
+    }
+    
+    @Override
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+    {
+    	//Check if we're already connected
+    	ConnectionState currentConnectionState = connectionState.get();
+    	if(currentConnectionState != null && currentConnectionState.isConnected())
+    	{
+    		return true;
+    	}
+    	
+    	long startTime = System.currentTimeMillis();
+    	long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
+    	
+    	for(;;)
+    	{
+    		synchronized(connectionState)
+    		{
+    			currentConnectionState = connectionState.get();
+    	    	if(currentConnectionState != null && currentConnectionState.isConnected())
+    			{
+    				return true;
+    			}
+    			
+    			long waitTime = 0;
+    			if(maxWaitTime > 0)
+    			{
+    				waitTime = maxWaitTimeMS  - (System.currentTimeMillis() - startTime);
+    	   			
+    				//Timeout
+        			if(waitTime <= 0)
+        			{
+        				return false;
+        			}    					
+    			}
+    		
+    			connectionState.wait(waitTime);
+    		}
+    	}
+    }
+    
+    public void blockUntilConnected() throws InterruptedException {
+    	blockUntilConnected(0, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
new file mode 100644
index 0000000..996e5fc
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -0,0 +1,217 @@
+package org.apache.curator.framework.imps;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestBlockUntilConnected extends BaseClassForTests
+{
+	/**
+	 * Test the case where we're already connected
+	 */
+	@Test
+	public void testBlockUntilConnectedCurrentlyConnected()
+	{		
+		Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	final CountDownLatch connectedLatch = new CountDownLatch(1);
+        	client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+        	{
+				
+				@Override
+				public void stateChanged(CuratorFramework client, ConnectionState newState)
+				{
+					if(newState.isConnected())
+					{
+						connectedLatch.countDown();
+					}
+				}
+			});
+        	
+        	client.start();
+        	
+        	Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+        	Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+        }
+        catch(InterruptedException e)
+        {
+        	Assert.fail("Unexpected interruption");
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}
+	
+	/**
+	 * Test the case where we are not currently connected and never have been
+	 */
+	@Test
+	public void testBlockUntilConnectedCurrentlyNeverConnected()
+	{		
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	client.start();
+        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+        }
+        catch(InterruptedException e)
+        {
+        	Assert.fail("Unexpected interruption");
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}
+	
+	/**
+	 * Test the case where we are not currently connected, but have been previously
+	 */
+	@Test
+	public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+	{		
+		Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        final CountDownLatch lostLatch = new CountDownLatch(1);
+        client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+        {
+			
+			@Override
+			public void stateChanged(CuratorFramework client, ConnectionState newState)
+			{
+				if(newState == ConnectionState.LOST)
+				{
+					lostLatch.countDown();
+				}
+			}
+		});
+        
+        try
+        {
+        	client.start();
+        	
+        	//Block until we're connected
+        	Assert.assertTrue(client.blockUntilConnected(5,  TimeUnit.SECONDS), "Failed to connect");
+        	
+        	//Kill the server
+        	CloseableUtils.closeQuietly(server);
+        	
+        	//Wait until we hit the lost state
+        	Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+        	
+        	server = new TestingServer(server.getPort(), server.getTempDirectory());
+        	
+        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+        }
+        catch(Exception e)
+        {
+        	Assert.fail("Unexpected exception " + e);
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}	
+	
+	/**
+	 * Test the case where we are not currently connected and time out before a
+	 * connection becomes available.
+	 */
+	@Test
+	public void testBlockUntilConnectedConnectTimeout()
+	{	
+		//Kill the server
+		CloseableUtils.closeQuietly(server);
+		
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	client.start();
+        	Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
+        					   "Connected");
+        }
+        catch(InterruptedException e)
+        {
+        	Assert.fail("Unexpected interruption");
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}
+	
+	/**
+	 * Test the case where we are not currently connected and the thread gets interrupted
+	 * prior to a connection becoming available
+	 */
+	@Test
+	public void testBlockUntilConnectedInterrupt()
+	{	
+		//Kill the server
+		CloseableUtils.closeQuietly(server);
+		
+        final CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	client.start();
+        	
+        	final Thread threadToInterrupt = Thread.currentThread();
+        	
+        	Timer timer = new Timer();
+        	timer.schedule(new TimerTask() {
+				
+				@Override
+				public void run() {
+					threadToInterrupt.interrupt();
+				}
+			}, 3000);
+        	
+        	client.blockUntilConnected(5, TimeUnit.SECONDS);
+        	Assert.fail("Expected interruption did not occur");
+        }
+        catch(InterruptedException e)
+        {
+        	//This is expected
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}	
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index c1157c9..21e8cca 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.leader;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -31,6 +32,7 @@ import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -38,6 +40,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
@@ -45,6 +48,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -144,6 +148,14 @@ public class LeaderLatch implements Closeable
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
     }
+    
+    private CountDownLatch startLatch;
+    
+    public LeaderLatch(CuratorFramework client, String latchPath,
+    		CountDownLatch startLatch) {
+    	this(client, latchPath);
+        this.startLatch = startLatch;
+    }
 
     /**
      * Add this instance to the leadership election and attempt to acquire leadership.
@@ -154,8 +166,30 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        client.getConnectionStateListenable().addListener(listener);
-        reset();
+        //Block until connected
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
+        executor.submit(new Runnable()
+        {
+			
+			@Override
+			public void run()
+			{
+		        try
+		        {
+		        	client.blockUntilConnected();
+			        
+			        client.getConnectionStateListenable().addListener(listener);		        	
+		        	reset();
+		        }
+		        catch(Exception ex)
+		        {
+		        	log.error("An error occurred checking resetting leadership.", ex);
+		        } finally {
+		        	//Shutdown the executor
+		        	executor.shutdown();
+		        }
+			}
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index d3b00bc..35d8809 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,10 +22,12 @@ package org.apache.curator.framework.recipes.leader;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
@@ -33,6 +35,7 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -42,6 +45,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class TestLeaderLatch extends BaseClassForTests
@@ -531,57 +535,45 @@ public class TestLeaderLatch extends BaseClassForTests
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.newClient(
                 server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
+                timing.connection(), new RetryNTimes(5, 1000));
 
         client.start();
 
-        final CountDownLatch sessionLostCount = new CountDownLatch(1);
-
-        // Need to ensure that we've actually lost the connection completely
-        // before starting. Otherwise it's possible that the server will start
-        // during retries of the inital commands to create the latch zNodes
-        client.getConnectionStateListenable().addListener(
-                new ConnectionStateListener()
-                {
-                    @Override
-                    public void stateChanged(CuratorFramework client,
-                            ConnectionState newState)
-                    {
-                        if (newState == ConnectionState.LOST)
-                        {
-                            sessionLostCount.countDown();
-                        }
-                    }
-                });
-
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
         final CountDownLatch leaderCounter = new CountDownLatch(1);
+        final AtomicInteger leaderCount = new AtomicInteger(0);
+        final AtomicInteger notLeaderCount = new AtomicInteger(0);        
         leader.addListener(new LeaderLatchListener()
         {
             @Override
             public void isLeader()
             {
                 leaderCounter.countDown();
+                leaderCount.incrementAndGet();
             }
 
             @Override
             public void notLeader()
             {
+            	notLeaderCount.incrementAndGet();
             }
 
         });
 
         try
         {
-            leader.start();
-
-            timing.awaitLatch(sessionLostCount);
+        	leader.start();
+         
+        	//Wait for a while before starting the test server
+        	Thread.sleep(5000);
 
             // Start the new server
             server = new TestingServer(server.getPort());
 
-            timing.awaitLatch(leaderCounter);
-
+            Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
+            
+            Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
+            Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");            
         }
         catch (Exception e)
         {
@@ -593,7 +585,6 @@ public class TestLeaderLatch extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
             CloseableUtils.closeQuietly(server);
         }
-
     }    
 
     private enum Mode


[14/18] git commit: Now that start() uses AfterConnectionEstablished, no longer correct/needed to handle CONNECTED in state change

Posted by ra...@apache.org.
Now that start() uses AfterConnectionEstablished, no longer correct/needed to handle CONNECTED in state change


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fab79a45
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fab79a45
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fab79a45

Branch: refs/heads/master
Commit: fab79a4577d3af80260d25128cc56f26c7011dbe
Parents: 04cefb4
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:41:13 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:41:13 2014 -0500

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java   | 100 ++++++++++---------
 1 file changed, 51 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fab79a45/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index dce3f5e..9d70645 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -44,7 +44,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -156,9 +155,7 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        AfterConnectionEstablished.execute
-        (
-            client, new Runnable()
+        AfterConnectionEstablished.execute(client, new Runnable()
             {
                 @Override
                 public void run()
@@ -173,8 +170,7 @@ public class LeaderLatch implements Closeable
                         log.error("An error occurred checking resetting leadership.", e);
                     }
                 }
-            }
-        );
+            });
     }
 
     /**
@@ -196,7 +192,6 @@ public class LeaderLatch implements Closeable
      * instances must eventually be closed.
      *
      * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
-     *
      * @throws IOException errors
      */
     public void close(CloseMode closeMode) throws IOException
@@ -218,28 +213,28 @@ public class LeaderLatch implements Closeable
 
             switch ( closeMode )
             {
-                case NOTIFY_LEADER:
-                {
-                    setLeadership(false);
-                    listeners.clear();
-                    break;
-                }
+            case NOTIFY_LEADER:
+            {
+                setLeadership(false);
+                listeners.clear();
+                break;
+            }
 
-                default:
-                {
-                    listeners.clear();
-                    setLeadership(false);
-                    break;
-                }
+            default:
+            {
+                listeners.clear();
+                setLeadership(false);
+                break;
+            }
             }
         }
     }
 
     /**
      * Attaches a listener to this LeaderLatch
-     * <p>
+     * <p/>
      * Attaching the same listener multiple times is a noop from the second time on.
-     * <p>
+     * <p/>
      * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
      * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
@@ -253,15 +248,15 @@ public class LeaderLatch implements Closeable
 
     /**
      * Attaches a listener to this LeaderLatch
-     * <p>
+     * <p/>
      * Attaching the same listener multiple times is a noop from the second time on.
-     * <p>
+     * <p/>
      * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
      * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
      *
      * @param listener the listener to attach
-     * @param executor     An executor to run the methods for the listener on.
+     * @param executor An executor to run the methods for the listener on.
      */
     public void addListener(LeaderLatchListener listener, Executor executor)
     {
@@ -282,7 +277,7 @@ public class LeaderLatch implements Closeable
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
      * <p>If this instance already is the leader then this method returns immediately.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of three things happen:</p>
@@ -324,10 +319,10 @@ public class LeaderLatch implements Closeable
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted},
      * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
-     *
+     * <p/>
      * <p>If this instance already is the leader then this method returns immediately
      * with the value {@code true}.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of four things happen:</p>
@@ -338,7 +333,7 @@ public class LeaderLatch implements Closeable
      * <li>The specified waiting time elapses.</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     *
+     * <p/>
      * <p>If the current thread:</p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
@@ -346,7 +341,7 @@ public class LeaderLatch implements Closeable
      * </ul>
      * <p>then {@link InterruptedException} is thrown and the current thread's
      * interrupted status is cleared.</p>
-     *
+     * <p/>
      * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
      * then the value {@code false} is returned.  If the time is less than or equal to zero, the method
      * will not wait at all.</p>
@@ -354,7 +349,7 @@ public class LeaderLatch implements Closeable
      * @param timeout the maximum time to wait
      * @param unit    the time unit of the {@code timeout} argument
      * @return {@code true} if the count reached zero and {@code false}
-     *         if the waiting time elapsed before the count reached zero or the instances was closed
+     * if the waiting time elapsed before the count reached zero or the instances was closed
      * @throws InterruptedException if the current thread is interrupted
      *                              while waiting
      */
@@ -561,22 +556,35 @@ public class LeaderLatch implements Closeable
 
     private void handleStateChange(ConnectionState newState)
     {
-        if ( newState == ConnectionState.RECONNECTED )
+        switch ( newState )
         {
-            try
+            default:
             {
-                reset();
+                // NOP
+                break;
+            }
+
+            case RECONNECTED:
+            {
+                try
+                {
+                    reset();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not reset leader latch", e);
+                    setLeadership(false);
+                }
+                break;
             }
-            catch (Exception e)
+
+            case SUSPENDED:
+            case LOST:
             {
-                log.error("Could not reset leader latch", e);
                 setLeadership(false);
+                break;
             }
         }
-        else
-        {
-            setLeadership(false);
-        }
     }
 
     private synchronized void setLeadership(boolean newValue)
@@ -585,9 +593,7 @@ public class LeaderLatch implements Closeable
 
         if ( oldValue && !newValue )
         { // Lost leadership, was true, now false
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
+            listeners.forEach(new Function<LeaderLatchListener, Void>()
                 {
                     @Override
                     public Void apply(LeaderLatchListener listener)
@@ -595,14 +601,11 @@ public class LeaderLatch implements Closeable
                         listener.notLeader();
                         return null;
                     }
-                }
-            );
+                });
         }
         else if ( !oldValue && newValue )
         { // Gained leadership, was false, now true
-            listeners.forEach
-            (
-                new Function<LeaderLatchListener, Void>()
+            listeners.forEach(new Function<LeaderLatchListener, Void>()
                 {
                     @Override
                     public Void apply(LeaderLatchListener input)
@@ -610,8 +613,7 @@ public class LeaderLatch implements Closeable
                         input.isLeader();
                         return null;
                     }
-                }
-            );
+                });
         }
 
         notifyAll();


[13/18] git commit: Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require a connection state listener

Posted by ra...@apache.org.
Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require a connection state listener


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/04cefb47
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/04cefb47
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/04cefb47

Branch: refs/heads/master
Commit: 04cefb47f18c9d4bd3a0eb897563dd5abb7c89c8
Parents: 1c94c7e
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:35:54 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:35:54 2014 -0500

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    | 289 +++++++-----------
 .../framework/state/ConnectionStateManager.java |  38 ++-
 .../framework/imps/TestBlockUntilConnected.java | 304 +++++++++----------
 .../recipes/AfterConnectionEstablished.java     |  14 +-
 .../framework/recipes/leader/LeaderLatch.java   |  21 +-
 .../recipes/leader/TestLeaderLatch.java         |  46 +--
 6 files changed, 336 insertions(+), 376 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 14473d8..23a3248 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-
 import org.apache.curator.CuratorConnectionLossException;
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
@@ -44,7 +44,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Arrays;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -59,40 +58,40 @@ import java.util.concurrent.atomic.AtomicReference;
 public class CuratorFrameworkImpl implements CuratorFramework
 {
 
-    private final Logger                                                log = LoggerFactory.getLogger(getClass());
-    private final CuratorZookeeperClient                                client;
-    private final ListenerContainer<CuratorListener>                    listeners;
-    private final ListenerContainer<UnhandledErrorListener>             unhandledErrorListeners;
-    private final ThreadFactory                                         threadFactory;
-    private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
-    private final NamespaceImpl                                         namespace;
-    private final ConnectionStateManager                                connectionStateManager;
-    private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
-    private final byte[]                                                defaultData;
-    private final FailedDeleteManager                                   failedDeleteManager;
-    private final CompressionProvider                                   compressionProvider;
-    private final ACLProvider                                           aclProvider;
-    private final NamespaceFacadeCache                                  namespaceFacadeCache;
-    private final NamespaceWatcherMap                                   namespaceWatcherMap = new NamespaceWatcherMap(this);
-    private final Object												connectionLock = new Object();
-
-    private volatile ExecutorService                                    executorService;
-    private final AtomicBoolean                                         logAsErrorConnectionErrors = new AtomicBoolean(false);
-
-    private static final boolean                                        LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorZookeeperClient client;
+    private final ListenerContainer<CuratorListener> listeners;
+    private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
+    private final ThreadFactory threadFactory;
+    private final BlockingQueue<OperationAndData<?>> backgroundOperations;
+    private final NamespaceImpl namespace;
+    private final ConnectionStateManager connectionStateManager;
+    private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
+    private final byte[] defaultData;
+    private final FailedDeleteManager failedDeleteManager;
+    private final CompressionProvider compressionProvider;
+    private final ACLProvider aclProvider;
+    private final NamespaceFacadeCache namespaceFacadeCache;
+    private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
+
+    private volatile ExecutorService executorService;
+    private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
+
+    private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
 
     interface DebugBackgroundListener
     {
-        void        listen(OperationAndData<?> data);
+        void listen(OperationAndData<?> data);
     }
-    volatile DebugBackgroundListener        debugListener = null;
 
-    private final AtomicReference<CuratorFrameworkState>                    state;
+    volatile DebugBackgroundListener debugListener = null;
+
+    private final AtomicReference<CuratorFrameworkState> state;
 
     private static class AuthInfo
     {
-        final String    scheme;
-        final byte[]    auth;
+        final String scheme;
+        final byte[] auth;
 
         private AuthInfo(String scheme, byte[] auth)
         {
@@ -113,37 +112,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
     public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
     {
         ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
-        this.client = new CuratorZookeeperClient
-        (
-            localZookeeperFactory,
-            builder.getEnsembleProvider(),
-            builder.getSessionTimeoutMs(),
-            builder.getConnectionTimeoutMs(),
-            new Watcher()
+        this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+        {
+            @Override
+            public void process(WatchedEvent watchedEvent)
             {
-                @Override
-                public void process(WatchedEvent watchedEvent)
-                {
-                    CuratorEvent event = new CuratorEventImpl
-                    (
-                        CuratorFrameworkImpl.this,
-                        CuratorEventType.WATCHED,
-                        watchedEvent.getState().getIntValue(),
-                        unfixForNamespace(watchedEvent.getPath()),
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        watchedEvent,
-                        null
-                    );
-                    processEvent(event);
-                }
-            },
-            builder.getRetryPolicy(),
-            builder.canBeReadOnly()
-        );
+                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
+                processEvent(event);
+            }
+        }, builder.getRetryPolicy(), builder.canBeReadOnly());
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -155,7 +132,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
 
-        byte[]      builderDefaultData = builder.getDefaultData();
+        byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
 
         if ( builder.getAuthScheme() != null )
@@ -165,23 +142,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
         failedDeleteManager = new FailedDeleteManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
-        
-        //Add callback handler to determine connection state transitions
-    	getConnectionStateListenable().addListener(new ConnectionStateListener()
-    	{
-			
-			@Override
-			public void stateChanged(CuratorFramework client, ConnectionState newState)
-			{
-				if(newState.isConnected())
-				{
-					synchronized(connectionLock)
-					{
-						connectionLock.notifyAll();
-					}
-				}
-			}
-		});
     }
 
     private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -253,7 +213,19 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     @Override
-    public void     start()
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+    {
+        return connectionStateManager.blockUntilConnected(maxWaitTime, units);
+    }
+
+    @Override
+    public void blockUntilConnected() throws InterruptedException
+    {
+        blockUntilConnected(0, null);
+    }
+
+    @Override
+    public void start()
     {
         log.info("Starting");
         if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
@@ -266,7 +238,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         try
         {
             connectionStateManager.start(); // ordering dependency - must be called before client.start()
-            
+
             final ConnectionStateListener listener = new ConnectionStateListener()
             {
                 @Override
@@ -278,16 +250,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
                     }
                 }
             };
-            
+
             this.getConnectionStateListenable().addListener(listener);
 
             client.start();
 
             executorService = Executors.newFixedThreadPool(2, threadFactory);  // 1 for listeners, 1 for background ops
 
-            executorService.submit
-            (
-                new Callable<Object>()
+            executorService.submit(new Callable<Object>()
                 {
                     @Override
                     public Object call() throws Exception
@@ -295,8 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                         backgroundOperationsLoop();
                         return null;
                     }
-                }
-            );
+                });
         }
         catch ( Exception e )
         {
@@ -305,31 +274,28 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     @Override
-    public void     close()
+    public void close()
     {
         log.debug("Closing");
         if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
         {
-            listeners.forEach
-                (
-                    new Function<CuratorListener, Void>()
+            listeners.forEach(new Function<CuratorListener, Void>()
+                {
+                    @Override
+                    public Void apply(CuratorListener listener)
                     {
-                        @Override
-                        public Void apply(CuratorListener listener)
+                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
+                        try
+                        {
+                            listener.eventReceived(CuratorFrameworkImpl.this, event);
+                        }
+                        catch ( Exception e )
                         {
-                            CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
-                            try
-                            {
-                                listener.eventReceived(CuratorFrameworkImpl.this, event);
-                            }
-                            catch ( Exception e )
-                            {
-                                log.error("Exception while sending Closing event", e);
-                            }
-                            return null;
+                            log.error("Exception while sending Closing event", e);
                         }
+                        return null;
                     }
-                );
+                });
 
             listeners.clear();
             unhandledErrorListeners.clear();
@@ -515,14 +481,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
     {
-        boolean     isInitialExecution = (event == null);
+        boolean isInitialExecution = (event == null);
         if ( isInitialExecution )
         {
             performBackgroundOperation(operationAndData);
             return;
         }
 
-        boolean     doQueueOperation = false;
+        boolean doQueueOperation = false;
         do
         {
             if ( RetryLoop.shouldRetry(event.getResultCode()) )
@@ -538,7 +504,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
 
             processEvent(event);
-        } while ( false );
+        }
+        while ( false );
 
         if ( doQueueOperation )
         {
@@ -560,7 +527,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
         if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException) )
         {
-            if ( e instanceof KeeperException.ConnectionLossException || e instanceof CuratorConnectionLossException ) {
+            if ( e instanceof KeeperException.ConnectionLossException )
+            {
                 if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false) )
                 {
                     log.error(reason, e);
@@ -576,27 +544,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
         }
 
-        final String        localReason = reason;
-        unhandledErrorListeners.forEach
-            (
-                new Function<UnhandledErrorListener, Void>()
+        final String localReason = reason;
+        unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
+            {
+                @Override
+                public Void apply(UnhandledErrorListener listener)
                 {
-                    @Override
-                    public Void apply(UnhandledErrorListener listener)
-                    {
-                        listener.unhandledError(localReason, e);
-                        return null;
-                    }
+                    listener.unhandledError(localReason, e);
+                    return null;
                 }
-            );
+            });
     }
 
-    String    unfixForNamespace(String path)
+    String unfixForNamespace(String path)
     {
         return namespace.unfixForNamespace(path);
     }
 
-    String    fixForNamespace(String path)
+    String fixForNamespace(String path)
     {
         return namespace.fixForNamespace(path);
     }
@@ -723,8 +688,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 sendToBackgroundCallback(operationAndData, event);
             }
 
-            KeeperException.Code    code = KeeperException.Code.get(event.getResultCode());
-            Exception               e = null;
+            KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
+            Exception e = null;
             try
             {
                 e = (code != null) ? KeeperException.create(code) : null;
@@ -755,7 +720,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
     }
 
-    private<DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
+    private <DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
     {
         do
         {
@@ -788,14 +753,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
 
             logError("Background exception was not retry-able or retry gave up", e);
-        } while ( false );
+        }
+        while ( false );
     }
 
     private void backgroundOperationsLoop()
     {
         while ( !Thread.interrupted() )
         {
-            OperationAndData<?>         operationAndData;
+            OperationAndData<?> operationAndData;
             try
             {
                 operationAndData = backgroundOperations.take();
@@ -850,7 +816,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 }
                 else
                 {
-                	logError("Background retry gave up", e);
+                    logError("Background retry gave up", e);
                 }
             }
             else
@@ -867,70 +833,23 @@ public class CuratorFrameworkImpl implements CuratorFramework
             validateConnection(curatorEvent.getWatchedEvent().getState());
         }
 
-        listeners.forEach
-        (
-            new Function<CuratorListener, Void>()
+        listeners.forEach(new Function<CuratorListener, Void>()
+        {
+            @Override
+            public Void apply(CuratorListener listener)
             {
-                @Override
-                public Void apply(CuratorListener listener)
+                try
                 {
-                    try
-                    {
-                        TimeTrace trace = client.startTracer("EventListener");
-                        listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
-                        trace.commit();
-                    }
-                    catch ( Exception e )
-                    {
-                        logError("Event listener threw exception", e);
-                    }
-                    return null;
+                    TimeTrace trace = client.startTracer("EventListener");
+                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
+                    trace.commit();
                 }
+                catch ( Exception e )
+                {
+                    logError("Event listener threw exception", e);
+                }
+                return null;
             }
-        );
-    }
-    
-    @Override
-    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
-    {
-    	//Check if we're already connected
-    	ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
-    	if(currentConnectionState != null && currentConnectionState.isConnected())
-    	{
-    		return true;
-    	}
-    	
-    	long startTime = System.currentTimeMillis();
-    	long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
-    	
-    	for(;;)
-    	{
-    		synchronized(connectionLock)
-    		{
-    			currentConnectionState = connectionStateManager.getCurrentConnectionState();
-    	    	if(currentConnectionState != null && currentConnectionState.isConnected())
-    			{
-    				return true;
-    			}
-    			
-    			long waitTime = 0;
-    			if(maxWaitTime > 0)
-    			{
-    				waitTime = maxWaitTimeMS  - (System.currentTimeMillis() - startTime);
-    	   			
-    				//Timeout
-        			if(waitTime <= 0)
-        			{
-        				return false;
-        			}    					
-    			}
-    		
-    			connectionLock.wait(waitTime);
-    		}
-    	}
-    }
-    
-    public void blockUntilConnected() throws InterruptedException {
-    	blockUntilConnected(0, TimeUnit.SECONDS);
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index ba29994..fb312dc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -153,6 +154,7 @@ public class ConnectionStateManager implements Closeable
 
         currentConnectionState = ConnectionState.SUSPENDED;
         postState(ConnectionState.SUSPENDED);
+
         return true;
     }
 
@@ -188,15 +190,45 @@ public class ConnectionStateManager implements Closeable
 
         return true;
     }
-    
-    public synchronized ConnectionState getCurrentConnectionState()
+
+    public synchronized boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+    {
+        boolean hasMaxWait = (units != null);
+        long startTime = System.currentTimeMillis();
+
+        while ( !isConnected() )
+        {
+            long maxWaitTimeMS = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0;
+
+            if ( hasMaxWait )
+            {
+                long waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+                if ( waitTime <= 0 )
+                {
+                    return isConnected();
+                }
+
+                wait(waitTime);
+            }
+            else
+            {
+                wait();
+            }
+        }
+        return isConnected();
+    }
+
+    public synchronized boolean isConnected()
     {
-    	return currentConnectionState;
+        return (currentConnectionState != null) && currentConnectionState.isConnected();
     }
 
     private void postState(ConnectionState state)
     {
         log.info("State change: " + state);
+
+        notifyAll();
+
         while ( !eventQueue.offer(state) )
         {
             eventQueue.poll();

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index 8dfb7d8..f649afb 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -16,12 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.framework.imps;
 
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+package org.apache.curator.framework.imps;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -34,202 +30,206 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class TestBlockUntilConnected extends BaseClassForTests
 {
-	/**
-	 * Test the case where we're already connected
-	 */
-	@Test
-	public void testBlockUntilConnectedCurrentlyConnected()
-	{		
-		Timing timing = new Timing();
+    /**
+     * Test the case where we're already connected
+     */
+    @Test
+    public void testBlockUntilConnectedCurrentlyConnected() throws Exception
+    {
+        Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	final CountDownLatch connectedLatch = new CountDownLatch(1);
-        	client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-        	{
-				
-				@Override
-				public void stateChanged(CuratorFramework client, ConnectionState newState)
-				{
-					if(newState.isConnected())
-					{
-						connectedLatch.countDown();
-					}
-				}
-			});
-        	
-        	client.start();
-        	
-        	Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
-        	Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState.isConnected() )
+                    {
+                        connectedLatch.countDown();
+                    }
+                }
+            });
+
+            client.start();
+
+            Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+            Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	Assert.fail("Unexpected interruption");
+            Assert.fail("Unexpected interruption");
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}
-	
-	/**
-	 * Test the case where we are not currently connected and never have been
-	 */
-	@Test
-	public void testBlockUntilConnectedCurrentlyNeverConnected()
-	{		
+    }
+
+    /**
+     * Test the case where we are not currently connected and never have been
+     */
+    @Test
+    public void testBlockUntilConnectedCurrentlyNeverConnected()
+    {
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	client.start();
-        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+            client.start();
+            Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	Assert.fail("Unexpected interruption");
+            Assert.fail("Unexpected interruption");
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}
-	
-	/**
-	 * Test the case where we are not currently connected, but have been previously
-	 */
-	@Test
-	public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
-	{		
-		Timing timing = new Timing();
+    }
+
+    /**
+     * Test the case where we are not currently connected, but have been previously
+     */
+    @Test
+    public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+    {
+        Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         final CountDownLatch lostLatch = new CountDownLatch(1);
         client.getConnectionStateListenable().addListener(new ConnectionStateListener()
         {
-			
-			@Override
-			public void stateChanged(CuratorFramework client, ConnectionState newState)
-			{
-				if(newState == ConnectionState.LOST)
-				{
-					lostLatch.countDown();
-				}
-			}
-		});
-        
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            }
+        });
+
         try
         {
-        	client.start();
-        	
-        	//Block until we're connected
-        	Assert.assertTrue(client.blockUntilConnected(5,  TimeUnit.SECONDS), "Failed to connect");
-        	
-        	//Kill the server
-        	CloseableUtils.closeQuietly(server);
-        	
-        	//Wait until we hit the lost state
-        	Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
-        	
-        	server = new TestingServer(server.getPort(), server.getTempDirectory());
-        	
-        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+            client.start();
+
+            //Block until we're connected
+            Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
+
+            //Kill the server
+            CloseableUtils.closeQuietly(server);
+
+            //Wait until we hit the lost state
+            Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+            Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
         }
-        catch(Exception e)
+        catch ( Exception e )
         {
-        	Assert.fail("Unexpected exception " + e);
+            Assert.fail("Unexpected exception " + e);
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}	
-	
-	/**
-	 * Test the case where we are not currently connected and time out before a
-	 * connection becomes available.
-	 */
-	@Test
-	public void testBlockUntilConnectedConnectTimeout()
-	{	
-		//Kill the server
-		CloseableUtils.closeQuietly(server);
-		
+    }
+
+    /**
+     * Test the case where we are not currently connected and time out before a
+     * connection becomes available.
+     */
+    @Test
+    public void testBlockUntilConnectedConnectTimeout()
+    {
+        //Kill the server
+        CloseableUtils.closeQuietly(server);
+
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	client.start();
-        	Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
-        					   "Connected");
+            client.start();
+            Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), "Connected");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	Assert.fail("Unexpected interruption");
+            Assert.fail("Unexpected interruption");
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}
-	
-	/**
-	 * Test the case where we are not currently connected and the thread gets interrupted
-	 * prior to a connection becoming available
-	 */
-	@Test
-	public void testBlockUntilConnectedInterrupt()
-	{	
-		//Kill the server
-		CloseableUtils.closeQuietly(server);
-		
+    }
+
+    /**
+     * Test the case where we are not currently connected and the thread gets interrupted
+     * prior to a connection becoming available
+     */
+    @Test
+    public void testBlockUntilConnectedInterrupt()
+    {
+        //Kill the server
+        CloseableUtils.closeQuietly(server);
+
         final CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
-        
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
+
         try
         {
-        	client.start();
-        	
-        	final Thread threadToInterrupt = Thread.currentThread();
-        	
-        	Timer timer = new Timer();
-        	timer.schedule(new TimerTask() {
-				
-				@Override
-				public void run() {
-					threadToInterrupt.interrupt();
-				}
-			}, 3000);
-        	
-        	client.blockUntilConnected(5, TimeUnit.SECONDS);
-        	Assert.fail("Expected interruption did not occur");
+            client.start();
+
+            final Thread threadToInterrupt = Thread.currentThread();
+
+            Timer timer = new Timer();
+            timer.schedule(new TimerTask()
+            {
+
+                @Override
+                public void run()
+                {
+                    threadToInterrupt.interrupt();
+                }
+            }, 3000);
+
+            client.blockUntilConnected(5, TimeUnit.SECONDS);
+            Assert.fail("Expected interruption did not occur");
         }
-        catch(InterruptedException e)
+        catch ( InterruptedException e )
         {
-        	//This is expected
+            //This is expected
         }
         finally
         {
-        	CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(client);
         }
-	}	
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index f37f7c0..41ba702 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -39,24 +38,23 @@ public class AfterConnectionEstablished
      * @param client             The curator client
      * @param runAfterConnection The logic to run
      */
-    public static <T> T execute(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+    public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
     {
         //Block until connected
-        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
-        Callable<T> internalCall = new Callable<T>()
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
+        Runnable internalCall = new Runnable()
         {
             @Override
-            public T call() throws Exception
+            public void run()
             {
                 try
                 {
                     client.blockUntilConnected();
-                    return runAfterConnection.call();
+                    runAfterConnection.run();
                 }
                 catch ( Exception e )
                 {
                     log.error("An error occurred blocking until a connection is available", e);
-                    throw e;
                 }
                 finally
                 {
@@ -64,7 +62,7 @@ public class AfterConnectionEstablished
                 }
             }
         };
-        return executor.submit(internalCall).get();
+        executor.submit(internalCall);
     }
 
     private AfterConnectionEstablished()

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index f4c1cef..dce3f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -26,7 +26,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
+import org.apache.curator.framework.recipes.AfterConnectionEstablished;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -156,17 +156,22 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground
+        AfterConnectionEstablished.execute
         (
-            client,
-            new Callable<Void>()
+            client, new Runnable()
             {
                 @Override
-                public Void call() throws Exception
+                public void run()
                 {
                     client.getConnectionStateListenable().addListener(listener);
-                    reset();
-                    return null;
+                    try
+                    {
+                        reset();
+                    }
+                    catch ( Exception e )
+                    {
+                        log.error("An error occurred checking resetting leadership.", e);
+                    }
                 }
             }
         );
@@ -556,7 +561,7 @@ public class LeaderLatch implements Closeable
 
     private void handleStateChange(ConnectionState newState)
     {
-        if (newState.isConnected())
+        if ( newState == ConnectionState.RECONNECTED )
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 35d8809..b97e708 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.recipes.leader;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
@@ -35,7 +34,6 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -215,6 +213,17 @@ public class TestLeaderLatch extends BaseClassForTests
     @Test
     public void testWaiting() throws Exception
     {
+        final int LOOPS = 10;
+        for ( int i = 0; i < LOOPS; ++i )
+        {
+            System.out.println("TRY #" + i);
+            internalTestWaitingOnce();
+            Thread.sleep(10);
+        }
+    }
+
+    private void internalTestWaitingOnce() throws Exception
+    {
         final int PARTICIPANT_QTY = 10;
 
         ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
@@ -241,10 +250,10 @@ public class TestLeaderLatch extends BaseClassForTests
                             Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
                             Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
                             Thread.sleep((int)(10 * Math.random()));
+                            thereIsALeader.set(false);
                         }
                         finally
                         {
-                            thereIsALeader.set(false);
                             latch.close();
                         }
                         return null;
@@ -259,7 +268,7 @@ public class TestLeaderLatch extends BaseClassForTests
         }
         finally
         {
-            executorService.shutdown();
+            executorService.shutdownNow();
             CloseableUtils.closeQuietly(client);
         }
     }
@@ -526,23 +535,21 @@ public class TestLeaderLatch extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
     public void testNoServerAtStart()
-        {
+    {
         CloseableUtils.closeQuietly(server);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryNTimes(5, 1000));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryNTimes(5, 1000));
 
         client.start();
 
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
         final CountDownLatch leaderCounter = new CountDownLatch(1);
         final AtomicInteger leaderCount = new AtomicInteger(0);
-        final AtomicInteger notLeaderCount = new AtomicInteger(0);        
+        final AtomicInteger notLeaderCount = new AtomicInteger(0);
         leader.addListener(new LeaderLatchListener()
         {
             @Override
@@ -555,27 +562,26 @@ public class TestLeaderLatch extends BaseClassForTests
             @Override
             public void notLeader()
             {
-            	notLeaderCount.incrementAndGet();
+                notLeaderCount.incrementAndGet();
             }
 
         });
 
         try
         {
-        	leader.start();
-         
-        	//Wait for a while before starting the test server
-        	Thread.sleep(5000);
+            leader.start();
+
+            timing.sleepABit();
 
             // Start the new server
-            server = new TestingServer(server.getPort());
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
 
             Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
-            
+
             Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
-            Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");            
+            Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
         }
-        catch (Exception e)
+        catch ( Exception e )
         {
             Assert.fail("Unexpected exception", e);
         }
@@ -585,7 +591,7 @@ public class TestLeaderLatch extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
             CloseableUtils.closeQuietly(server);
         }
-    }    
+    }
 
     private enum Mode
     {


[04/18] git commit: some edits on the rpc web doc

Posted by ra...@apache.org.
some edits on the rpc web doc


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/70c52544
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/70c52544
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/70c52544

Branch: refs/heads/master
Commit: 70c5254460ab35c8ad4db46ace83425a1c1bb2db
Parents: 6e98562
Author: randgalt <ra...@apache.org>
Authored: Sat Jun 7 15:01:01 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jun 7 15:01:01 2014 -0500

----------------------------------------------------------------------
 curator-x-rpc/src/site/confluence/index.confluence | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/70c52544/curator-x-rpc/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/index.confluence b/curator-x-rpc/src/site/confluence/index.confluence
index ffba2de..b3686c3 100644
--- a/curator-x-rpc/src/site/confluence/index.confluence
+++ b/curator-x-rpc/src/site/confluence/index.confluence
@@ -1,6 +1,10 @@
 h1. Curator RPC Proxy
 
-NOTE: Curator RPC Proxy is in its own package in Maven Central: curator\-x\-rpc
+h2. Packaging
+
+Curator RPC Proxy is in its own package in Maven Central: curator\-x\-rpc
+
+h2. What Is a Curator RPC?
 
 The Curator RPC module implements a proxy that bridges non\-java environments with the Curator framework and recipes. It uses
 [[Apache Thrift|http://thrift.apache.org]] which supports a large set of languages and environments.
@@ -16,6 +20,7 @@ The benefits of Curator RPC are:
 * There are Thrift implementations for a large number of languages/environments
 
 h2. Thrift File
+
 The current Curator RPC Thrift File can be found here: [[https://raw.githubusercontent.com/apache/curator/master/curator-x-rpc/src/main/thrift/curator.thrift]]. Use
 this to generate code for the language/environment you need.
 


[10/18] git commit: CURATOR-110 - Fixed up the connection notifications so that they will only notify blocked clients when a connected state is reached, rather than any state.

Posted by ra...@apache.org.
CURATOR-110 - Fixed up the connection notifications so that they will
only notify blocked clients when a connected state is reached, rather
than any state.

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/59bab738
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/59bab738
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/59bab738

Branch: refs/heads/master
Commit: 59bab738f56e3e5017bcba8984c1d8ca24e1e71a
Parents: e8138ed
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Mon Jun 16 16:45:17 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Mon Jun 16 16:45:17 2014 +1000

----------------------------------------------------------------------
 .../apache/curator/framework/imps/CuratorFrameworkImpl.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/59bab738/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index d1de29f..14473d8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -173,9 +173,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
 			@Override
 			public void stateChanged(CuratorFramework client, ConnectionState newState)
 			{
-				synchronized(connectionLock)
+				if(newState.isConnected())
 				{
-					connectionLock.notifyAll();
+					synchronized(connectionLock)
+					{
+						connectionLock.notifyAll();
+					}
 				}
 			}
 		});


[11/18] git commit: 1. Added license. 2. ExecuteAfterConnectionEstablished should use a Callable and allow the exception to bubble up

Posted by ra...@apache.org.
1. Added license. 2. ExecuteAfterConnectionEstablished should use a Callable and allow the exception to bubble up


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/59076777
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/59076777
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/59076777

Branch: refs/heads/master
Commit: 59076777adfe42d96b0a92f775c8920033e1d975
Parents: 59bab73
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 12:40:34 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 12:40:34 2014 -0500

----------------------------------------------------------------------
 .../framework/imps/TestBlockUntilConnected.java | 18 ++++
 .../ExecuteAfterConnectionEstablished.java      | 96 ++++++++++++--------
 .../framework/recipes/leader/LeaderLatch.java   | 32 +++----
 3 files changed, 89 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/59076777/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index 996e5fc..8dfb7d8 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.imps;
 
 import java.util.Timer;

http://git-wip-us.apache.org/repos/asf/curator/blob/59076777/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
index d213d37..408ed03 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
@@ -1,53 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes;
 
-import java.util.concurrent.ExecutorService;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
- *
  */
 public class ExecuteAfterConnectionEstablished
 {
-	private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
-	
-	/**
-	 * Spawns a new new background thread that will block until a connection is available and
-	 * then execute the 'runAfterConnection' logic
-	 * @param name The name of the spawned thread
-	 * @param client The curator client
-	 * @param runAfterConnection The logic to run
-	 */
-	public static void executeAfterConnectionEstablishedInBackground(String name,
-																     final CuratorFramework client,
-																     final Runnable runAfterConnection)
-	{
+    private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
+
+    /**
+     * Spawns a new new background thread that will block until a connection is available and
+     * then execute the 'runAfterConnection' logic
+     *
+     * @param client             The curator client
+     * @param runAfterConnection The logic to run
+     */
+    public static <T> T executeAfterConnectionEstablishedInBackground(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+    {
         //Block until connected
-        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(name);
-        executor.submit(new Runnable()
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
+        Callable<T> internalCall = new Callable<T>()
         {
-			
-			@Override
-			public void run()
-			{
-				try
-				{
-		        	client.blockUntilConnected();			        
-			        runAfterConnection.run();
-				}
-				catch(Exception e)
-				{
-					log.error("An error occurred blocking until a connection is available", e);
-				}
-				finally
-				{
-					executor.shutdown();
-				}
-			}
-        });
-	}
+            @Override
+            public T call() throws Exception
+            {
+                try
+                {
+                    client.blockUntilConnected();
+                    return runAfterConnection.call();
+                }
+                catch ( Exception e )
+                {
+                    log.error("An error occurred blocking until a connection is available", e);
+                    throw e;
+                }
+                finally
+                {
+                    executor.shutdown();
+                }
+            }
+        };
+        return executor.submit(internalCall).get();
+    }
+
+    private ExecuteAfterConnectionEstablished()
+    {
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/59076777/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 13a9f21..f4c1cef 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.recipes.leader;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -33,7 +32,6 @@ import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -41,15 +39,14 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -159,23 +156,20 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground(LeaderLatch.class.getName(),
-        		client, new Runnable()
-        {					
-        	@Override
-			public void run()
-			{
-        		try
-        		{
-        			client.getConnectionStateListenable().addListener(listener);		        	
-        			reset();
-        		}
-        		catch(Exception ex)
+        ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground
+        (
+            client,
+            new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
                 {
-                	log.error("An error occurred checking resetting leadership.", ex);
+                    client.getConnectionStateListenable().addListener(listener);
+                    reset();
+                    return null;
                 }
-			}
-		});     
+            }
+        );
     }
 
     /**


[07/18] git commit: added desc for rpc

Posted by ra...@apache.org.
added desc for rpc


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d4af5c5b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d4af5c5b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d4af5c5b

Branch: refs/heads/master
Commit: d4af5c5bb48af8fa84075218eb1e1de924417ff6
Parents: a57be39
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 9 13:27:19 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 9 13:27:19 2014 -0500

----------------------------------------------------------------------
 curator-x-rpc/pom.xml | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d4af5c5b/curator-x-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-rpc/pom.xml b/curator-x-rpc/pom.xml
index 1bc3b90..f66cd76 100644
--- a/curator-x-rpc/pom.xml
+++ b/curator-x-rpc/pom.xml
@@ -10,6 +10,10 @@
     <artifactId>curator-x-rpc</artifactId>
     <version>2.5.1-SNAPSHOT</version>
 
+    <name>Curator RPC Proxy</name>
+    <description>A proxy that bridges non-java environments with the Curator framework and recipes</description>
+    <inceptionYear>2014</inceptionYear>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.curator</groupId>


[18/18] git commit: Merge branch 'CURATOR-110'

Posted by ra...@apache.org.
Merge branch 'CURATOR-110'


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5d7d0c7f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5d7d0c7f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5d7d0c7f

Branch: refs/heads/master
Commit: 5d7d0c7f1430dde06cef8800bca3f035dd707c0d
Parents: d4af5c5 bac23d3
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 18:02:55 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 18:02:55 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/utils/ThreadUtils.java   |   9 +
 .../curator/framework/CuratorFramework.java     |  20 ++
 .../framework/imps/CuratorFrameworkImpl.java    | 225 ++++++++----------
 .../framework/state/ConnectionState.java        |  45 +++-
 .../framework/state/ConnectionStateManager.java |  37 +++
 .../framework/imps/TestBlockUntilConnected.java | 235 +++++++++++++++++++
 .../recipes/AfterConnectionEstablished.java     |  74 ++++++
 .../framework/recipes/leader/LeaderLatch.java   | 153 +++++++-----
 .../recipes/leader/TestLeaderLatch.java         | 125 +++++++++-
 9 files changed, 739 insertions(+), 184 deletions(-)
----------------------------------------------------------------------



[02/18] git commit: CURATOR-110 - Fixed up formatting to Curator standards

Posted by ra...@apache.org.
CURATOR-110 - Fixed up formatting to Curator standards


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0b7ae7e3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0b7ae7e3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0b7ae7e3

Branch: refs/heads/master
Commit: 0b7ae7e3672a9c817fe31144cd6b18d9a357f124
Parents: 1a63a10
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Tue Jun 3 09:01:31 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Tue Jun 3 09:01:31 2014 +1000

----------------------------------------------------------------------
 .../framework/state/ConnectionState.java        |  35 +-
 .../framework/recipes/leader/LeaderLatch.java   | 620 +++++++++----------
 .../recipes/leader/TestLeaderLatch.java         | 491 ++++++++-------
 3 files changed, 602 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index b25ce38..7663e10 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -23,17 +23,18 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 /**
  * Represents state changes in the connection to ZK
  */
-public enum ConnectionState {
+public enum ConnectionState
+{
     /**
-     * Sent for the first successful connection to the server. NOTE: You will
-     * only get one of these messages for any CuratorFramework instance.
+     * Sent for the first successful connection to the server. NOTE: You will only
+     * get one of these messages for any CuratorFramework instance.
      */
     CONNECTED,
 
     /**
      * There has been a loss of connection. Leaders, locks, etc. should suspend
-     * until the connection is re-established. If the connection times-out you
-     * will receive a {@link #LOST} notice
+     * until the connection is re-established. If the connection times-out you will
+     * receive a {@link #LOST} notice
      */
     SUSPENDED,
 
@@ -43,30 +44,28 @@ public enum ConnectionState {
     RECONNECTED,
 
     /**
-     * The connection is confirmed to be lost. Close any locks, leaders, etc.
-     * and attempt to re-create them. NOTE: it is possible to get a
-     * {@link #RECONNECTED} state after this but you should still consider any
-     * locks, etc. as dirty/unstable
+     * The connection is confirmed to be lost. Close any locks, leaders, etc. and
+     * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED}
+     * state after this but you should still consider any locks, etc. as dirty/unstable
      */
     LOST,
 
     /**
-     * The connection has gone into read-only mode. This can only happen if you
-     * pass true for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}.
-     * See the ZooKeeper doc regarding read only connections: <a
-     * href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode"
-     * >http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>. The
-     * connection will remain in read only mode until another state change is
-     * sent.
+     * The connection has gone into read-only mode. This can only happen if you pass true
+     * for {@link CuratorFrameworkFactory.Builder#canBeReadOnly()}. See the ZooKeeper doc
+     * regarding read only connections:
+     * <a href="http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode">http://wiki.apache.org/hadoop/ZooKeeper/GSoCReadOnlyMode</a>.
+     * The connection will remain in read only mode until another state change is sent.
      */
     READ_ONLY;
-
+    
     /**
      * Check if this state indicates that Curator has a connection to ZooKeeper
      * 
      * @return True if connected, false otherwise
      */
-    public boolean isConnected() {
+    public boolean isConnected()
+    {
         return this == CONNECTED || this == RECONNECTED || this == READ_ONLY;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index d09ed1b..c1157c9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -19,17 +19,9 @@
 
 package org.apache.curator.framework.recipes.leader;
 
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -46,60 +38,71 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>
- * Abstraction to select a "leader" amongst multiple contenders in a group of
- * JMVs connected to a Zookeeper cluster. If a group of N thread/processes
- * contend for leadership one will randomly be assigned leader until it releases
- * leadership at which time another one from the group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
+ * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
+ * randomly be assigned leader until it releases leadership at which time another one from the
+ * group will randomly be chosen
  * </p>
  */
-public class LeaderLatch implements Closeable {
+public class LeaderLatch implements Closeable
+{
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final String latchPath;
     private final String id;
-    private final AtomicReference<State> state = new AtomicReference<State>(
-            State.LATENT);
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
 
-    private final ConnectionStateListener listener = new ConnectionStateListener() {
+    private final ConnectionStateListener listener = new ConnectionStateListener()
+    {
         @Override
-        public void stateChanged(CuratorFramework client,
-                ConnectionState newState) {
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
             handleStateChange(newState);
         }
     };
 
     private static final String LOCK_NAME = "latch-";
 
-    private static final LockInternalsSorter sorter = new LockInternalsSorter() {
+    private static final LockInternalsSorter sorter = new LockInternalsSorter()
+    {
         @Override
-        public String fixForSorting(String str, String lockName) {
-            return StandardLockInternalsDriver.standardFixForSorting(str,
-                    lockName);
+        public String fixForSorting(String str, String lockName)
+        {
+            return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
         }
     };
 
-    public enum State {
-        LATENT, STARTED, CLOSED
+    public enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
     }
 
     /**
      * How to handle listeners when the latch is closed
      */
-    public enum CloseMode {
+    public enum CloseMode
+    {
         /**
-         * When the latch is closed, listeners will *not* be notified (default
-         * behavior)
+         * When the latch is closed, listeners will *not* be notified (default behavior)
          */
         SILENT,
 
@@ -110,116 +113,105 @@ public class LeaderLatch implements Closeable {
     }
 
     /**
-     * @param client
-     *            the client
-     * @param latchPath
-     *            the path for this leadership group
+     * @param client    the client
+     * @param latchPath the path for this leadership group
      */
-    public LeaderLatch(CuratorFramework client, String latchPath) {
+    public LeaderLatch(CuratorFramework client, String latchPath)
+    {
         this(client, latchPath, "", CloseMode.SILENT);
     }
 
     /**
-     * @param client
-     *            the client
-     * @param latchPath
-     *            the path for this leadership group
-     * @param id
-     *            participant ID
+     * @param client    the client
+     * @param latchPath the path for this leadership group
+     * @param id        participant ID
      */
-    public LeaderLatch(CuratorFramework client, String latchPath, String id) {
+    public LeaderLatch(CuratorFramework client, String latchPath, String id)
+    {
         this(client, latchPath, id, CloseMode.SILENT);
     }
 
     /**
-     * @param client
-     *            the client
-     * @param latchPath
-     *            the path for this leadership group
-     * @param id
-     *            participant ID
-     * @param closeMode
-     *            behaviour of listener on explicit close.
+     * @param client    the client
+     * @param latchPath the path for this leadership group
+     * @param id        participant ID
+     * @param closeMode behaviour of listener on explicit close.
      */
-    public LeaderLatch(CuratorFramework client, String latchPath, String id,
-            CloseMode closeMode) {
-        this.client = Preconditions.checkNotNull(client,
-                "client cannot be null");
-        this.latchPath = Preconditions.checkNotNull(latchPath,
-                "mutexPath cannot be null");
+    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
+    {
+        this.client = Preconditions.checkNotNull(client, "client cannot be null");
+        this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null");
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
-        this.closeMode = Preconditions.checkNotNull(closeMode,
-                "closeMode cannot be null");
+        this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
     }
 
     /**
-     * Add this instance to the leadership election and attempt to acquire
-     * leadership.
-     * 
-     * @throws Exception
-     *             errors
+     * Add this instance to the leadership election and attempt to acquire leadership.
+     *
+     * @throws Exception errors
      */
-    public void start() throws Exception {
-        Preconditions.checkState(
-                state.compareAndSet(State.LATENT, State.STARTED),
-                "Cannot be started more than once");
+    public void start() throws Exception
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
         client.getConnectionStateListenable().addListener(listener);
         reset();
     }
 
     /**
-     * Remove this instance from the leadership election. If this instance is
-     * the leader, leadership is released. IMPORTANT: the only way to release
-     * leadership is by calling close(). All LeaderLatch instances must
-     * eventually be closed.
-     * 
-     * @throws IOException
-     *             errors
+     * Remove this instance from the leadership election. If this instance is the leader, leadership
+     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+     * instances must eventually be closed.
+     *
+     * @throws IOException errors
      */
     @Override
-    public void close() throws IOException {
+    public void close() throws IOException
+    {
         close(closeMode);
     }
 
     /**
-     * Remove this instance from the leadership election. If this instance is
-     * the leader, leadership is released. IMPORTANT: the only way to release
-     * leadership is by calling close(). All LeaderLatch instances must
-     * eventually be closed.
-     * 
-     * @param closeMode
-     *            allows the default close mode to be overridden at the time the
-     *            latch is closed.
-     * 
-     * @throws IOException
-     *             errors
+     * Remove this instance from the leadership election. If this instance is the leader, leadership
+     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
+     * instances must eventually be closed.
+     *
+     * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
+     *
+     * @throws IOException errors
      */
-    public void close(CloseMode closeMode) throws IOException {
-        Preconditions.checkState(
-                state.compareAndSet(State.STARTED, State.CLOSED),
-                "Already closed or has not been started");
+    public void close(CloseMode closeMode) throws IOException
+    {
+        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
-        try {
+        try
+        {
             setNode(null);
-        } catch (Exception e) {
+        }
+        catch ( Exception e )
+        {
             throw new IOException(e);
-        } finally {
+        }
+        finally
+        {
             client.getConnectionStateListenable().removeListener(listener);
 
-            switch (closeMode) {
-            case NOTIFY_LEADER: {
-                setLeadership(false);
-                listeners.clear();
-                break;
-            }
+            switch ( closeMode )
+            {
+                case NOTIFY_LEADER:
+                {
+                    setLeadership(false);
+                    listeners.clear();
+                    break;
+                }
 
-            default: {
-                listeners.clear();
-                setLeadership(false);
-                break;
-            }
+                default:
+                {
+                    listeners.clear();
+                    setLeadership(false);
+                    break;
+                }
             }
         }
     }
@@ -227,160 +219,134 @@ public class LeaderLatch implements Closeable {
     /**
      * Attaches a listener to this LeaderLatch
      * <p>
-     * Attaching the same listener multiple times is a noop from the second time
-     * on.
+     * Attaching the same listener multiple times is a noop from the second time on.
      * <p>
-     * All methods for the listener are run using the provided Executor. It is
-     * common to pass in a single-threaded executor so that you can be certain
-     * that listener methods are called in sequence, but if you are fine with
+     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
+     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
-     * 
-     * @param listener
-     *            the listener to attach
+     *
+     * @param listener the listener to attach
      */
-    public void addListener(LeaderLatchListener listener) {
+    public void addListener(LeaderLatchListener listener)
+    {
         listeners.addListener(listener);
     }
 
     /**
      * Attaches a listener to this LeaderLatch
      * <p>
-     * Attaching the same listener multiple times is a noop from the second time
-     * on.
+     * Attaching the same listener multiple times is a noop from the second time on.
      * <p>
-     * All methods for the listener are run using the provided Executor. It is
-     * common to pass in a single-threaded executor so that you can be certain
-     * that listener methods are called in sequence, but if you are fine with
+     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
+     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
      * them being called out of order you are welcome to use multiple threads.
-     * 
-     * @param listener
-     *            the listener to attach
-     * @param executor
-     *            An executor to run the methods for the listener on.
+     *
+     * @param listener the listener to attach
+     * @param executor     An executor to run the methods for the listener on.
      */
-    public void addListener(LeaderLatchListener listener, Executor executor) {
+    public void addListener(LeaderLatchListener listener, Executor executor)
+    {
         listeners.addListener(listener, executor);
     }
 
     /**
      * Removes a given listener from this LeaderLatch
-     * 
-     * @param listener
-     *            the listener to remove
+     *
+     * @param listener the listener to remove
      */
-    public void removeListener(LeaderLatchListener listener) {
+    public void removeListener(LeaderLatchListener listener)
+    {
         listeners.removeListener(listener);
     }
 
     /**
-     * <p>
-     * Causes the current thread to wait until this instance acquires leadership
-     * unless the thread is {@linkplain Thread#interrupt interrupted} or
-     * {@linkplain #close() closed}.
-     * </p>
-     * <p>
-     * If this instance already is the leader then this method returns
-     * immediately.
-     * </p>
-     * 
-     * <p>
-     * Otherwise the current thread becomes disabled for thread scheduling
-     * purposes and lies dormant until one of three things happen:
-     * </p>
+     * <p>Causes the current thread to wait until this instance acquires leadership
+     * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
+     * <p>If this instance already is the leader then this method returns immediately.</p>
+     *
+     * <p>Otherwise the current
+     * thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of three things happen:</p>
      * <ul>
      * <li>This instance becomes the leader</li>
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
-     * current thread</li>
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     * <p>
-     * If the current thread:
-     * </p>
+     * <p>If the current thread:</p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
      * </ul>
-     * <p>
-     * then {@link InterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
-     * </p>
-     * 
-     * @throws InterruptedException
-     *             if the current thread is interrupted while waiting
-     * @throws EOFException
-     *             if the instance is {@linkplain #close() closed} while waiting
+     * <p>then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.</p>
+     *
+     * @throws InterruptedException if the current thread is interrupted
+     *                              while waiting
+     * @throws EOFException         if the instance is {@linkplain #close() closed}
+     *                              while waiting
      */
-    public void await() throws InterruptedException, EOFException {
-        synchronized (this) {
-            while ((state.get() == State.STARTED) && !hasLeadership.get()) {
+    public void await() throws InterruptedException, EOFException
+    {
+        synchronized(this)
+        {
+            while ( (state.get() == State.STARTED) && !hasLeadership.get() )
+            {
                 wait();
             }
         }
-        if (state.get() != State.STARTED) {
+        if ( state.get() != State.STARTED )
+        {
             throw new EOFException();
         }
     }
 
     /**
-     * <p>
-     * Causes the current thread to wait until this instance acquires leadership
-     * unless the thread is {@linkplain Thread#interrupt interrupted}, the
-     * specified waiting time elapses or the instance is {@linkplain #close()
-     * closed}.
-     * </p>
-     * 
-     * <p>
-     * If this instance already is the leader then this method returns
-     * immediately with the value {@code true}.
-     * </p>
-     * 
-     * <p>
-     * Otherwise the current thread becomes disabled for thread scheduling
-     * purposes and lies dormant until one of four things happen:
-     * </p>
+     * <p>Causes the current thread to wait until this instance acquires leadership
+     * unless the thread is {@linkplain Thread#interrupt interrupted},
+     * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
+     *
+     * <p>If this instance already is the leader then this method returns immediately
+     * with the value {@code true}.</p>
+     *
+     * <p>Otherwise the current
+     * thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of four things happen:</p>
      * <ul>
      * <li>This instance becomes the leader</li>
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
-     * current thread</li>
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread</li>
      * <li>The specified waiting time elapses.</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     * 
-     * <p>
-     * If the current thread:
-     * </p>
+     *
+     * <p>If the current thread:</p>
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
      * </ul>
-     * <p>
-     * then {@link InterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
-     * </p>
-     * 
-     * <p>
-     * If the specified waiting time elapses or the instance is
-     * {@linkplain #close() closed} then the value {@code false} is returned. If
-     * the time is less than or equal to zero, the method will not wait at all.
-     * </p>
-     * 
-     * @param timeout
-     *            the maximum time to wait
-     * @param unit
-     *            the time unit of the {@code timeout} argument
-     * @return {@code true} if the count reached zero and {@code false} if the
-     *         waiting time elapsed before the count reached zero or the
-     *         instances was closed
-     * @throws InterruptedException
-     *             if the current thread is interrupted while waiting
+     * <p>then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.</p>
+     *
+     * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
+     * then the value {@code false} is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.</p>
+     *
+     * @param timeout the maximum time to wait
+     * @param unit    the time unit of the {@code timeout} argument
+     * @return {@code true} if the count reached zero and {@code false}
+     *         if the waiting time elapsed before the count reached zero or the instances was closed
+     * @throws InterruptedException if the current thread is interrupted
+     *                              while waiting
      */
-    public boolean await(long timeout, TimeUnit unit)
-            throws InterruptedException {
+    public boolean await(long timeout, TimeUnit unit) throws InterruptedException
+    {
         long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
 
-        synchronized (this) {
-            while ((waitNanos > 0) && (state.get() == State.STARTED)
-                    && !hasLeadership.get()) {
+        synchronized(this)
+        {
+            while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
+            {
                 long startNanos = System.nanoTime();
                 TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
                 long elapsed = System.nanoTime() - startNanos;
@@ -392,23 +358,24 @@ public class LeaderLatch implements Closeable {
 
     /**
      * Return this instance's participant Id
-     * 
+     *
      * @return participant Id
      */
-    public String getId() {
+    public String getId()
+    {
         return id;
     }
 
     /**
-     * Returns this instances current state, this is the only way to verify that
-     * the object has been closed before closing again. If you try to close a
-     * latch multiple times, the close() method will throw an
-     * IllegalArgumentException which is often not caught and ignored
-     * (CloseableUtils.closeQuietly() only looks for IOException).
-     * 
+     * Returns this instances current state, this is the only way to verify that the object has been closed before
+     * closing again.  If you try to close a latch multiple times, the close() method will throw an
+     * IllegalArgumentException which is often not caught and ignored (CloseableUtils.closeQuietly() only looks for
+     * IOException).
+     *
      * @return the state of the current instance
      */
-    public State getState() {
+    public State getState()
+    {
         return state.get();
     }
 
@@ -419,17 +386,16 @@ public class LeaderLatch implements Closeable {
      * <p>
      * <p>
      * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     * return a value that does not match {@link #hasLeadership()} as
-     * hasLeadership uses a local field of the class.
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
-     * 
+     *
      * @return participants
-     * @throws Exception
-     *             ZK errors, interruptions, etc.
+     * @throws Exception ZK errors, interruptions, etc.
      */
-    public Collection<Participant> getParticipants() throws Exception {
-        Collection<String> participantNodes = LockInternals
-                .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+    public Collection<Participant> getParticipants() throws Exception
+    {
+        Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getParticipants(client, participantNodes);
     }
 
@@ -441,26 +407,26 @@ public class LeaderLatch implements Closeable {
      * <p>
      * <p>
      * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     * return a value that does not match {@link #hasLeadership()} as
-     * hasLeadership uses a local field of the class.
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
-     * 
+     *
      * @return leader
-     * @throws Exception
-     *             ZK errors, interruptions, etc.
+     * @throws Exception ZK errors, interruptions, etc.
      */
-    public Participant getLeader() throws Exception {
-        Collection<String> participantNodes = LockInternals
-                .getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
+    public Participant getLeader() throws Exception
+    {
+        Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getLeader(client, participantNodes);
     }
 
     /**
      * Return true if leadership is currently held by this instance
-     * 
+     *
      * @return true/false
      */
-    public boolean hasLeadership() {
+    public boolean hasLeadership()
+    {
         return (state.get() == State.STARTED) && hasLeadership.get();
     }
 
@@ -468,95 +434,105 @@ public class LeaderLatch implements Closeable {
     volatile CountDownLatch debugResetWaitLatch = null;
 
     @VisibleForTesting
-    void reset() throws Exception {
+    void reset() throws Exception
+    {
         setLeadership(false);
         setNode(null);
 
-        BackgroundCallback callback = new BackgroundCallback() {
+        BackgroundCallback callback = new BackgroundCallback()
+        {
             @Override
-            public void processResult(CuratorFramework client,
-                    CuratorEvent event) throws Exception {
-                if (debugResetWaitLatch != null) {
+            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+            {
+                if ( debugResetWaitLatch != null )
+                {
                     debugResetWaitLatch.await();
                     debugResetWaitLatch = null;
                 }
 
-                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
                     setNode(event.getName());
-                    if (state.get() == State.CLOSED) {
+                    if ( state.get() == State.CLOSED )
+                    {
                         setNode(null);
-                    } else {
+                    }
+                    else
+                    {
                         getChildren();
                     }
-                } else {
-                    log.error("getChildren() failed. rc = "
-                            + event.getResultCode());
+                }
+                else
+                {
+                    log.error("getChildren() failed. rc = " + event.getResultCode());
                 }
             }
         };
-        client.create()
-                .creatingParentsIfNeeded()
-                .withProtection()
-                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
-                .inBackground(callback)
-                .forPath(ZKPaths.makePath(latchPath, LOCK_NAME),
-                        LeaderSelector.getIdBytes(id));
+        client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
     }
 
-    private void checkLeadership(List<String> children) throws Exception {
+    private void checkLeadership(List<String> children) throws Exception
+    {
         final String localOurPath = ourPath.get();
-        List<String> sortedChildren = LockInternals.getSortedChildren(
-                LOCK_NAME, sorter, children);
-        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths
-                .getNodeFromPath(localOurPath)) : -1;
-        if (ourIndex < 0) {
+        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
+        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+        if ( ourIndex < 0 )
+        {
             log.error("Can't find our node. Resetting. Index: " + ourIndex);
             reset();
-        } else if (ourIndex == 0) {
+        }
+        else if ( ourIndex == 0 )
+        {
             setLeadership(true);
-        } else {
+        }
+        else
+        {
             String watchPath = sortedChildren.get(ourIndex - 1);
-            Watcher watcher = new Watcher() {
+            Watcher watcher = new Watcher()
+            {
                 @Override
-                public void process(WatchedEvent event) {
-                    if ((state.get() == State.STARTED)
-                            && (event.getType() == Event.EventType.NodeDeleted)
-                            && (localOurPath != null)) {
-                        try {
+                public void process(WatchedEvent event)
+                {
+                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
+                    {
+                        try
+                        {
                             getChildren();
-                        } catch (Exception ex) {
-                            log.error(
-                                    "An error occurred checking the leadership.",
-                                    ex);
+                        }
+                        catch ( Exception ex )
+                        {
+                            log.error("An error occurred checking the leadership.", ex);
                         }
                     }
                 }
             };
 
-            BackgroundCallback callback = new BackgroundCallback() {
+            BackgroundCallback callback = new BackgroundCallback()
+            {
                 @Override
-                public void processResult(CuratorFramework client,
-                        CuratorEvent event) throws Exception {
-                    if (event.getResultCode() == KeeperException.Code.NONODE
-                            .intValue()) {
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+                {
+                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+                    {
                         // previous node is gone - reset
                         reset();
                     }
                 }
             };
-            // use getData() instead of exists() to avoid leaving unneeded
-            // watchers which is a type of resource leak
-            client.getData().usingWatcher(watcher).inBackground(callback)
-                    .forPath(ZKPaths.makePath(latchPath, watchPath));
+            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
+            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
         }
     }
 
-    private void getChildren() throws Exception {
-        BackgroundCallback callback = new BackgroundCallback() {
+    private void getChildren() throws Exception
+    {
+        BackgroundCallback callback = new BackgroundCallback()
+        {
             @Override
-            public void processResult(CuratorFramework client,
-                    CuratorEvent event) throws Exception {
-                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+            {
+                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
                     checkLeadership(event.getChildren());
                 }
             }
@@ -564,47 +540,69 @@ public class LeaderLatch implements Closeable {
         client.getChildren().inBackground(callback).forPath(latchPath);
     }
 
-    private void handleStateChange(ConnectionState newState) {
-        if (newState.isConnected()) {
-            try {
+    private void handleStateChange(ConnectionState newState)
+    {
+        if (newState.isConnected())
+        {
+            try
+            {
                 reset();
-            } catch (Exception e) {
+            }
+            catch (Exception e)
+            {
                 log.error("Could not reset leader latch", e);
                 setLeadership(false);
             }
-        } else {
+        }
+        else
+        {
             setLeadership(false);
         }
     }
 
-    private synchronized void setLeadership(boolean newValue) {
+    private synchronized void setLeadership(boolean newValue)
+    {
         boolean oldValue = hasLeadership.getAndSet(newValue);
 
-        if (oldValue && !newValue) { // Lost leadership, was true, now false
-            listeners.forEach(new Function<LeaderLatchListener, Void>() {
-                @Override
-                public Void apply(LeaderLatchListener listener) {
-                    listener.notLeader();
-                    return null;
+        if ( oldValue && !newValue )
+        { // Lost leadership, was true, now false
+            listeners.forEach
+            (
+                new Function<LeaderLatchListener, Void>()
+                {
+                    @Override
+                    public Void apply(LeaderLatchListener listener)
+                    {
+                        listener.notLeader();
+                        return null;
+                    }
                 }
-            });
-        } else if (!oldValue && newValue) { // Gained leadership, was false, now
-                                            // true
-            listeners.forEach(new Function<LeaderLatchListener, Void>() {
-                @Override
-                public Void apply(LeaderLatchListener input) {
-                    input.isLeader();
-                    return null;
+            );
+        }
+        else if ( !oldValue && newValue )
+        { // Gained leadership, was false, now true
+            listeners.forEach
+            (
+                new Function<LeaderLatchListener, Void>()
+                {
+                    @Override
+                    public Void apply(LeaderLatchListener input)
+                    {
+                        input.isLeader();
+                        return null;
+                    }
                 }
-            });
+            );
         }
 
         notifyAll();
     }
 
-    private void setNode(String newValue) throws Exception {
+    private void setNode(String newValue) throws Exception
+    {
         String oldPath = ourPath.getAndSet(newValue);
-        if (oldPath != null) {
+        if ( oldPath != null )
+        {
             client.delete().guaranteed().inBackground().forPath(oldPath);
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/0b7ae7e3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 108f118..d3b00bc 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,17 +19,9 @@
 
 package org.apache.curator.framework.recipes.leader;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
@@ -41,29 +33,36 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class TestLeaderLatch extends BaseClassForTests {
+public class TestLeaderLatch extends BaseClassForTests
+{
     private static final String PATH_NAME = "/one/two/me";
     private static final int MAX_LOOPS = 5;
 
     @Test
-    public void testResetRace() throws Exception {
+    public void testResetRace() throws Exception
+    {
         Timing timing = new Timing();
         LeaderLatch latch = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        try {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
             client.start();
             latch = new LeaderLatch(client, PATH_NAME);
 
             latch.debugResetWaitLatch = new CountDownLatch(1);
-            latch.start(); // will call reset()
-            latch.reset(); // should not result in two nodes
+            latch.start();  // will call reset()
+            latch.reset();  // should not result in two nodes
 
             timing.sleepABit();
 
@@ -71,21 +70,22 @@ public class TestLeaderLatch extends BaseClassForTests {
 
             timing.sleepABit();
 
-            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
-                    1);
-        } finally {
+            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 1);
+        }
+        finally
+        {
             CloseableUtils.closeQuietly(latch);
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testCreateDeleteRace() throws Exception {
+    public void testCreateDeleteRace() throws Exception
+    {
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        try {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
             client.start();
             LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
 
@@ -100,40 +100,43 @@ public class TestLeaderLatch extends BaseClassForTests {
 
             timing.sleepABit();
 
-            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(),
-                    0);
+            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
 
-        } finally {
+        }
+        finally
+        {
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testLostConnection() throws Exception {
+    public void testLostConnection() throws Exception
+    {
         final int PARTICIPANT_QTY = 10;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        try {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
             client.start();
 
             final CountDownLatch countDownLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(
-                    new ConnectionStateListener() {
-                        @Override
-                        public void stateChanged(CuratorFramework client,
-                                ConnectionState newState) {
-                            if (newState == ConnectionState.LOST) {
-                                countDownLatch.countDown();
-                            }
-                        }
-                    });
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.LOST )
+                    {
+                        countDownLatch.countDown();
+                    }
+                }
+            });
 
-            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
                 latch.start();
                 latches.add(latch);
@@ -148,12 +151,13 @@ public class TestLeaderLatch extends BaseClassForTests {
 
             Assert.assertEquals(getLeaders(latches).size(), 0);
 
-            server = new TestingServer(server.getPort(),
-                    server.getTempDirectory());
-            Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should
-                                                                            // reconnect
-        } finally {
-            for (LeaderLatch latch : latches) {
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
+            Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
@@ -161,20 +165,21 @@ public class TestLeaderLatch extends BaseClassForTests {
     }
 
     @Test
-    public void testCorrectWatching() throws Exception {
+    public void testCorrectWatching() throws Exception
+    {
         final int PARTICIPANT_QTY = 10;
         final int PARTICIPANT_ID = 2;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        try {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
             client.start();
 
-            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
                 latch.start();
                 latches.add(latch);
@@ -182,20 +187,20 @@ public class TestLeaderLatch extends BaseClassForTests {
 
             waitForALeader(latches, timing);
 
-            // we need to close a Participant that doesn't be actual leader
-            // (first Participant) nor the last
+            //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
             latches.get(PARTICIPANT_ID).close();
 
-            // As the previous algorithm assumed that if the watched node is
-            // deleted gets the leadership
-            // we need to ensure that the PARTICIPANT_ID-1 is not getting
-            // (wrongly) elected as leader.
+            //As the previous algorithm assumed that if the watched node is deleted gets the leadership
+            //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
             Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
-        } finally {
-            // removes the already closed participant
+        }
+        finally
+        {
+            //removes the already closed participant
             latches.remove(PARTICIPANT_ID);
 
-            for (LeaderLatch latch : latches) {
+            for ( LeaderLatch latch : latches )
+            {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
@@ -204,35 +209,37 @@ public class TestLeaderLatch extends BaseClassForTests {
     }
 
     @Test
-    public void testWaiting() throws Exception {
+    public void testWaiting() throws Exception
+    {
         final int PARTICIPANT_QTY = 10;
 
-        ExecutorService executorService = Executors
-                .newFixedThreadPool(PARTICIPANT_QTY);
-        ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(
-                executorService);
+        ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
+        ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(executorService);
 
         final Timing timing = new Timing();
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        try {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
             client.start();
 
             final AtomicBoolean thereIsALeader = new AtomicBoolean(false);
-            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
-                service.submit(new Callable<Void>() {
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
+                service.submit(new Callable<Void>()
+                {
                     @Override
-                    public Void call() throws Exception {
+                    public Void call() throws Exception
+                    {
                         LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                        try {
+                        try
+                        {
                             latch.start();
-                            Assert.assertTrue(latch.await(timing.forWaiting()
-                                    .seconds(), TimeUnit.SECONDS));
-                            Assert.assertTrue(thereIsALeader.compareAndSet(
-                                    false, true));
-                            Thread.sleep((int) (10 * Math.random()));
-                        } finally {
+                            Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+                            Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
+                            Thread.sleep((int)(10 * Math.random()));
+                        }
+                        finally
+                        {
                             thereIsALeader.set(false);
                             latch.close();
                         }
@@ -241,58 +248,68 @@ public class TestLeaderLatch extends BaseClassForTests {
                 });
             }
 
-            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
                 service.take().get();
             }
-        } finally {
+        }
+        finally
+        {
             executorService.shutdown();
             CloseableUtils.closeQuietly(client);
         }
     }
 
     @Test
-    public void testBasic() throws Exception {
+    public void testBasic() throws Exception
+    {
         basic(Mode.START_IMMEDIATELY);
     }
 
     @Test
-    public void testBasicAlt() throws Exception {
+    public void testBasicAlt() throws Exception
+    {
         basic(Mode.START_IN_THREADS);
     }
 
     @Test
-    public void testCallbackSanity() throws Exception {
+    public void testCallbackSanity() throws Exception
+    {
         final int PARTICIPANT_QTY = 10;
         final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
         final AtomicLong masterCounter = new AtomicLong(0);
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        ExecutorService exec = Executors
-                .newSingleThreadExecutor(new ThreadFactoryBuilder()
-                        .setDaemon(true).setNameFormat("callbackSanity-%s")
-                        .build());
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
 
         List<LeaderLatch> latches = Lists.newArrayList();
-        for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+        {
             final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-            latch.addListener(new LeaderLatchListener() {
+            latch.addListener(new LeaderLatchListener()
+            {
                 boolean beenLeader = false;
 
                 @Override
-                public void isLeader() {
-                    if (!beenLeader) {
+                public void isLeader()
+                {
+                    if ( !beenLeader )
+                    {
                         masterCounter.incrementAndGet();
                         beenLeader = true;
-                        try {
+                        try
+                        {
                             latch.reset();
-                        } catch (Exception e) {
+                        }
+                        catch ( Exception e )
+                        {
                             throw Throwables.propagate(e);
                         }
-                    } else {
+                    }
+                    else
+                    {
                         masterCounter.incrementAndGet();
                         CloseableUtils.closeQuietly(latch);
                         timesSquare.countDown();
@@ -300,17 +317,20 @@ public class TestLeaderLatch extends BaseClassForTests {
                 }
 
                 @Override
-                public void notLeader() {
+                public void notLeader()
+                {
                     notLeaderCounter.incrementAndGet();
                 }
             }, exec);
             latches.add(latch);
         }
 
-        try {
+        try
+        {
             client.start();
 
-            for (LeaderLatch latch : latches) {
+            for ( LeaderLatch latch : latches )
+            {
                 latch.start();
             }
 
@@ -318,12 +338,17 @@ public class TestLeaderLatch extends BaseClassForTests {
 
             Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
             Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY);
-            for (LeaderLatch latch : latches) {
+            for ( LeaderLatch latch : latches )
+            {
                 Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
             }
-        } finally {
-            for (LeaderLatch latch : latches) {
-                if (latch.getState() != LeaderLatch.State.CLOSED) {
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
+                if ( latch.getState() != LeaderLatch.State.CLOSED )
+                {
                     CloseableUtils.closeQuietly(latch);
                 }
             }
@@ -332,7 +357,8 @@ public class TestLeaderLatch extends BaseClassForTests {
     }
 
     @Test
-    public void testCallbackNotifyLeader() throws Exception {
+    public void testCallbackNotifyLeader() throws Exception
+    {
         final int PARTICIPANT_QTY = 10;
         final int SILENT_QTY = 3;
 
@@ -341,35 +367,37 @@ public class TestLeaderLatch extends BaseClassForTests {
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        ExecutorService exec = Executors
-                .newSingleThreadExecutor(new ThreadFactoryBuilder()
-                        .setDaemon(true)
-                        .setNameFormat("callbackNotifyLeader-%s").build());
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
 
         List<LeaderLatch> latches = Lists.newArrayList();
-        for (int i = 0; i < PARTICIPANT_QTY; ++i) {
-            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT
-                    : LeaderLatch.CloseMode.NOTIFY_LEADER;
+        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+        {
+            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? LeaderLatch.CloseMode.SILENT : LeaderLatch.CloseMode.NOTIFY_LEADER;
 
-            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "",
-                    closeMode);
-            latch.addListener(new LeaderLatchListener() {
+            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode);
+            latch.addListener(new LeaderLatchListener()
+            {
                 boolean beenLeader = false;
 
                 @Override
-                public void isLeader() {
-                    if (!beenLeader) {
+                public void isLeader()
+                {
+                    if ( !beenLeader )
+                    {
                         masterCounter.incrementAndGet();
                         beenLeader = true;
-                        try {
+                        try
+                        {
                             latch.reset();
-                        } catch (Exception e) {
+                        }
+                        catch ( Exception e )
+                        {
                             throw Throwables.propagate(e);
                         }
-                    } else {
+                    }
+                    else
+                    {
                         masterCounter.incrementAndGet();
                         CloseableUtils.closeQuietly(latch);
                         timesSquare.countDown();
@@ -377,31 +405,38 @@ public class TestLeaderLatch extends BaseClassForTests {
                 }
 
                 @Override
-                public void notLeader() {
+                public void notLeader()
+                {
                     notLeaderCounter.incrementAndGet();
                 }
             }, exec);
             latches.add(latch);
         }
 
-        try {
+        try
+        {
             client.start();
 
-            for (LeaderLatch latch : latches) {
+            for ( LeaderLatch latch : latches )
+            {
                 latch.start();
             }
 
             timesSquare.await();
 
             Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
-            Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2
-                    - SILENT_QTY);
-            for (LeaderLatch latch : latches) {
+            Assert.assertEquals(notLeaderCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY);
+            for ( LeaderLatch latch : latches )
+            {
                 Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
             }
-        } finally {
-            for (LeaderLatch latch : latches) {
-                if (latch.getState() != LeaderLatch.State.CLOSED) {
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
+                if ( latch.getState() != LeaderLatch.State.CLOSED )
+                {
                     CloseableUtils.closeQuietly(latch);
                 }
             }
@@ -410,42 +445,47 @@ public class TestLeaderLatch extends BaseClassForTests {
     }
 
     @Test
-    public void testCallbackDontNotify() throws Exception {
+    public void testCallbackDontNotify() throws Exception
+    {
         final AtomicLong masterCounter = new AtomicLong(0);
         final AtomicLong notLeaderCounter = new AtomicLong(0);
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
 
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
-        final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME,
-                "", LeaderLatch.CloseMode.NOTIFY_LEADER);
+        final LeaderLatch notifiedLeader = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER);
 
-        leader.addListener(new LeaderLatchListener() {
+        leader.addListener(new LeaderLatchListener()
+        {
             @Override
-            public void isLeader() {
+            public void isLeader()
+            {
             }
 
             @Override
-            public void notLeader() {
+            public void notLeader()
+            {
                 masterCounter.incrementAndGet();
             }
         });
 
-        notifiedLeader.addListener(new LeaderLatchListener() {
+        notifiedLeader.addListener(new LeaderLatchListener()
+        {
             @Override
-            public void isLeader() {
+            public void isLeader()
+            {
             }
 
             @Override
-            public void notLeader() {
+            public void notLeader()
+            {
                 notLeaderCounter.incrementAndGet();
             }
         });
 
-        try {
+        try
+        {
             client.start();
 
             leader.start();
@@ -464,24 +504,28 @@ public class TestLeaderLatch extends BaseClassForTests {
             leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
 
             Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
-            Assert.assertEquals(notifiedLeader.getState(),
-                    LeaderLatch.State.CLOSED);
+            Assert.assertEquals(notifiedLeader.getState(), LeaderLatch.State.CLOSED);
 
             Assert.assertEquals(masterCounter.get(), 1);
             Assert.assertEquals(notLeaderCounter.get(), 0);
-        } finally {
-            if (leader.getState() != LeaderLatch.State.CLOSED) {
+        }
+        finally
+        {
+            if ( leader.getState() != LeaderLatch.State.CLOSED )
+            {
                 CloseableUtils.closeQuietly(leader);
             }
-            if (notifiedLeader.getState() != LeaderLatch.State.CLOSED) {
+            if ( notifiedLeader.getState() != LeaderLatch.State.CLOSED )
+            {
                 CloseableUtils.closeQuietly(notifiedLeader);
             }
             CloseableUtils.closeQuietly(client);
         }
     }
-
+    
     @Test
-    public void testNoServerAtStart() {
+    public void testNoServerAtStart()
+        {
         CloseableUtils.closeQuietly(server);
 
         Timing timing = new Timing();
@@ -497,12 +541,14 @@ public class TestLeaderLatch extends BaseClassForTests {
         // before starting. Otherwise it's possible that the server will start
         // during retries of the inital commands to create the latch zNodes
         client.getConnectionStateListenable().addListener(
-                new ConnectionStateListener() {
-
+                new ConnectionStateListener()
+                {
                     @Override
                     public void stateChanged(CuratorFramework client,
-                            ConnectionState newState) {
-                        if (newState == ConnectionState.LOST) {
+                            ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.LOST)
+                        {
                             sessionLostCount.countDown();
                         }
                     }
@@ -510,20 +556,23 @@ public class TestLeaderLatch extends BaseClassForTests {
 
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
         final CountDownLatch leaderCounter = new CountDownLatch(1);
-        leader.addListener(new LeaderLatchListener() {
-
+        leader.addListener(new LeaderLatchListener()
+        {
             @Override
-            public void isLeader() {
+            public void isLeader()
+            {
                 leaderCounter.countDown();
             }
 
             @Override
-            public void notLeader() {
+            public void notLeader()
+            {
             }
 
         });
 
-        try {
+        try
+        {
             leader.start();
 
             timing.awaitLatch(sessionLostCount);
@@ -533,47 +582,58 @@ public class TestLeaderLatch extends BaseClassForTests {
 
             timing.awaitLatch(leaderCounter);
 
-        } catch (Exception e) {
+        }
+        catch (Exception e)
+        {
             Assert.fail("Unexpected exception", e);
-        } finally {
+        }
+        finally
+        {
             CloseableUtils.closeQuietly(leader);
             CloseableUtils.closeQuietly(client);
             CloseableUtils.closeQuietly(server);
         }
 
-    }
+    }    
 
-    private enum Mode {
-        START_IMMEDIATELY, START_IN_THREADS
+    private enum Mode
+    {
+        START_IMMEDIATELY,
+        START_IN_THREADS
     }
 
-    private void basic(Mode mode) throws Exception {
-        final int PARTICIPANT_QTY = 1;// 0;
+    private void basic(Mode mode) throws Exception
+    {
+        final int PARTICIPANT_QTY = 1;//0;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
-        try {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
             client.start();
 
-            for (int i = 0; i < PARTICIPANT_QTY; ++i) {
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                if (mode == Mode.START_IMMEDIATELY) {
+                if ( mode == Mode.START_IMMEDIATELY )
+                {
                     latch.start();
                 }
                 latches.add(latch);
             }
-            if (mode == Mode.START_IN_THREADS) {
-                ExecutorService service = Executors.newFixedThreadPool(latches
-                        .size());
-                for (final LeaderLatch latch : latches) {
-                    service.submit(new Callable<Object>() {
+            if ( mode == Mode.START_IN_THREADS )
+            {
+                ExecutorService service = Executors.newFixedThreadPool(latches.size());
+                for ( final LeaderLatch latch : latches )
+                {
+                    service.submit(new Callable<Object>()
+                    {
                         @Override
-                        public Object call() throws Exception {
-                            Thread.sleep((int) (100 * Math.random()));
+                        public Object call() throws Exception
+                        {
+                            Thread.sleep((int)(100 * Math.random()));
                             latch.start();
                             return null;
                         }
@@ -582,38 +642,36 @@ public class TestLeaderLatch extends BaseClassForTests {
                 service.shutdown();
             }
 
-            while (latches.size() > 0) {
+            while ( latches.size() > 0 )
+            {
                 List<LeaderLatch> leaders = waitForALeader(latches, timing);
-                Assert.assertEquals(leaders.size(), 1); // there can only be one
-                                                        // leader
+                Assert.assertEquals(leaders.size(), 1); // there can only be one leader
                 LeaderLatch theLeader = leaders.get(0);
-                if (mode == Mode.START_IMMEDIATELY) {
-                    Assert.assertEquals(latches.indexOf(theLeader), 0); // assert
-                                                                        // ordering
-                                                                        // -
-                                                                        // leadership
-                                                                        // should
-                                                                        // advance
-                                                                        // in
-                                                                        // start
-                                                                        // order
+                if ( mode == Mode.START_IMMEDIATELY )
+                {
+                    Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
                 }
                 theLeader.close();
                 latches.remove(theLeader);
             }
-        } finally {
-            for (LeaderLatch latch : latches) {
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
                 CloseableUtils.closeQuietly(latch);
             }
             CloseableUtils.closeQuietly(client);
         }
     }
 
-    private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches,
-            Timing timing) throws InterruptedException {
-        for (int i = 0; i < MAX_LOOPS; ++i) {
+    private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
+    {
+        for ( int i = 0; i < MAX_LOOPS; ++i )
+        {
             List<LeaderLatch> leaders = getLeaders(latches);
-            if (leaders.size() != 0) {
+            if ( leaders.size() != 0 )
+            {
                 return leaders;
             }
             timing.sleepABit();
@@ -621,10 +679,13 @@ public class TestLeaderLatch extends BaseClassForTests {
         return Lists.newArrayList();
     }
 
-    private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches) {
+    private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)
+    {
         List<LeaderLatch> leaders = Lists.newArrayList();
-        for (LeaderLatch latch : latches) {
-            if (latch.hasLeadership()) {
+        for ( LeaderLatch latch : latches )
+        {
+            if ( latch.hasLeadership() )
+            {
                 leaders.add(latch);
             }
         }


[12/18] git commit: Utility to get a name for the thread

Posted by ra...@apache.org.
Utility to get a name for the thread


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1c94c7ef
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1c94c7ef
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1c94c7ef

Branch: refs/heads/master
Commit: 1c94c7efa3b5256e14862495055805b4612ca4f4
Parents: 5907677
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:07:24 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:07:24 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/utils/ThreadUtils.java   |  9 +++
 .../recipes/AfterConnectionEstablished.java     | 73 ++++++++++++++++++++
 .../ExecuteAfterConnectionEstablished.java      | 73 --------------------
 3 files changed, 82 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
index f238124..9665dfe 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
@@ -53,4 +53,13 @@ public class ThreadUtils
             .setDaemon(true)
             .build();
     }
+
+    public static String getProcessName(Class<?> clazz)
+    {
+        if ( clazz.isAnonymousClass() )
+        {
+            return getProcessName(clazz.getEnclosingClass());
+        }
+        return clazz.getSimpleName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
new file mode 100644
index 0000000..f37f7c0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
+ */
+public class AfterConnectionEstablished
+{
+    private final static Logger log = LoggerFactory.getLogger(AfterConnectionEstablished.class);
+
+    /**
+     * Spawns a new new background thread that will block until a connection is available and
+     * then execute the 'runAfterConnection' logic
+     *
+     * @param client             The curator client
+     * @param runAfterConnection The logic to run
+     */
+    public static <T> T execute(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+    {
+        //Block until connected
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
+        Callable<T> internalCall = new Callable<T>()
+        {
+            @Override
+            public T call() throws Exception
+            {
+                try
+                {
+                    client.blockUntilConnected();
+                    return runAfterConnection.call();
+                }
+                catch ( Exception e )
+                {
+                    log.error("An error occurred blocking until a connection is available", e);
+                    throw e;
+                }
+                finally
+                {
+                    executor.shutdown();
+                }
+            }
+        };
+        return executor.submit(internalCall).get();
+    }
+
+    private AfterConnectionEstablished()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
deleted file mode 100644
index 408ed03..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
- */
-public class ExecuteAfterConnectionEstablished
-{
-    private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
-
-    /**
-     * Spawns a new new background thread that will block until a connection is available and
-     * then execute the 'runAfterConnection' logic
-     *
-     * @param client             The curator client
-     * @param runAfterConnection The logic to run
-     */
-    public static <T> T executeAfterConnectionEstablishedInBackground(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
-    {
-        //Block until connected
-        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
-        Callable<T> internalCall = new Callable<T>()
-        {
-            @Override
-            public T call() throws Exception
-            {
-                try
-                {
-                    client.blockUntilConnected();
-                    return runAfterConnection.call();
-                }
-                catch ( Exception e )
-                {
-                    log.error("An error occurred blocking until a connection is available", e);
-                    throw e;
-                }
-                finally
-                {
-                    executor.shutdown();
-                }
-            }
-        };
-        return executor.submit(internalCall).get();
-    }
-
-    private ExecuteAfterConnectionEstablished()
-    {
-    }
-}


[17/18] git commit: Added testProperCloseWithoutConnectionEstablished

Posted by ra...@apache.org.
Added testProperCloseWithoutConnectionEstablished


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bac23d3a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bac23d3a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bac23d3a

Branch: refs/heads/master
Commit: bac23d3af7abf4bf25d5acfca7c5f2cb53b739d0
Parents: 03e736c
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 17:11:53 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 17:11:53 2014 -0500

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java   | 18 +++++--
 .../recipes/leader/TestLeaderLatch.java         | 49 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bac23d3a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 6f7636a..af3f895 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -200,11 +200,7 @@ public class LeaderLatch implements Closeable
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
-        Future<?> localStartTask = startTask.getAndSet(null);
-        if ( localStartTask != null )
-        {
-            localStartTask.cancel(true);
-        }
+        cancelStartTask();
 
         try
         {
@@ -237,6 +233,18 @@ public class LeaderLatch implements Closeable
         }
     }
 
+    @VisibleForTesting
+    protected boolean cancelStartTask()
+    {
+        Future<?> localStartTask = startTask.getAndSet(null);
+        if ( localStartTask != null )
+        {
+            localStartTask.cancel(true);
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Attaches a listener to this LeaderLatch
      * <p/>

http://git-wip-us.apache.org/repos/asf/curator/blob/bac23d3a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4fb1c7..fecb9e3 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -52,6 +52,55 @@ public class TestLeaderLatch extends BaseClassForTests
     private static final int MAX_LOOPS = 5;
 
     @Test
+    public void testProperCloseWithoutConnectionEstablished() throws Exception
+    {
+        server.stop();
+
+        Timing timing = new Timing();
+        LeaderLatch latch = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final AtomicBoolean resetCalled = new AtomicBoolean(false);
+            final CountDownLatch cancelStartTaskLatch = new CountDownLatch(1);
+            latch = new LeaderLatch(client, PATH_NAME)
+            {
+                @Override
+                void reset() throws Exception
+                {
+                    resetCalled.set(true);
+                    super.reset();
+                }
+
+                @Override
+                protected boolean cancelStartTask()
+                {
+                    if ( super.cancelStartTask() )
+                    {
+                        cancelStartTaskLatch.countDown();
+                        return true;
+                    }
+                    return false;
+                }
+            };
+
+            latch.start();
+            latch.close();
+            latch = null;
+
+            Assert.assertTrue(timing.awaitLatch(cancelStartTaskLatch));
+            Assert.assertFalse(resetCalled.get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(latch);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testResetRace() throws Exception
     {
         Timing timing = new Timing();


[06/18] git commit: some doc edits

Posted by ra...@apache.org.
some doc edits


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a57be393
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a57be393
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a57be393

Branch: refs/heads/master
Commit: a57be393bab39fcc20b09331cc2a736127885823
Parents: 122476a
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 9 11:10:16 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 9 11:10:16 2014 -0500

----------------------------------------------------------------------
 .../src/site/confluence/configuration.confluence       |  2 +-
 curator-x-rpc/src/site/confluence/reference.confluence |  6 +++---
 curator-x-rpc/src/site/confluence/usage.confluence     | 13 +++++++------
 3 files changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/a57be393/curator-x-rpc/src/site/confluence/configuration.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/configuration.confluence b/curator-x-rpc/src/site/confluence/configuration.confluence
index 939ee47..56f6cfe 100644
--- a/curator-x-rpc/src/site/confluence/configuration.confluence
+++ b/curator-x-rpc/src/site/confluence/configuration.confluence
@@ -120,7 +120,7 @@ h2. Connection
 
 h2. Retry
 
-The retry policy configuration depends on what type is used. There 3 types supported:
+The retry policy configuration depends on what type is used. There are three types supported:
 
 ||Name||Type||Default Value||Description||
 |type|string|n/a|*exponential\-backoff*|

http://git-wip-us.apache.org/repos/asf/curator/blob/a57be393/curator-x-rpc/src/site/confluence/reference.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/reference.confluence b/curator-x-rpc/src/site/confluence/reference.confluence
index a7b1ce5..68c3692 100644
--- a/curator-x-rpc/src/site/confluence/reference.confluence
+++ b/curator-x-rpc/src/site/confluence/reference.confluence
@@ -6,16 +6,16 @@ h2. CuratorService
 
 ||API||Arguments||Return Value||Description||
 |newCuratorProjection|connectionName|CuratorProjection|Allocates a projection to a configured CuratorFramework instance in the RPC server. "connectionName" is the name of a [[configured|configuration.html]] connection.|
-|closeCuratorProjection|CuratorProjection|void|Close a CuratorProjection. Also closes any recipes, etc. create for the projection.|
+|closeCuratorProjection|CuratorProjection|void|Close a CuratorProjection. Also closes any recipes, etc. created for the projection.|
 |pingCuratorProjection|CuratorProjection|void|Keeps the CuratorProjection from timing out. NOTE: your [[EventService|events.html]] event loop will do this for you.|
 |createNode|CreateSpec|Created path name|Create a ZNode|
 |deleteNode|DeleteSpec|void|Delete a ZNode|
 |getData|GetDataSpec|bytes|Return a ZNode's data|
 |setData|SetDataSpec|Stat|Set a ZNode's data|
-|exists|ExistsSpec|Stat|Check is a ZNode exists|
+|exists|ExistsSpec|Stat|Check if a ZNode exists|
 |getChildren|GetChildrenSpec|List of nodes|Get the child nodes for a ZNode|
 |sync|path and async context|void|Do a ZooKeeper sync|
-|closeGenericProjection|id|void|Closes any projection|
+|closeGenericProjection|id|void|Closes any projection. All projections have an "id" field. This is the value to pass.|
 |acquireLock|path, maxWaitMs|optional lock projection|Acquire a lock for the given path. Will wait at most maxWaitMs to acquire the lock. If the acquisition fails, result will be null.|
 |startLeaderSelector|path, participantId, waitForLeadershipMs|LeaderResult|Start a leader selector on the given path. The instance will be assigned the specified participantId. If waitForLeadershipMs is non\-zero, the method will block for that amount of time waiting for leadership.|
 |getLeaderParticipants|leaderProjection|List of Participant|Return the participants in a leader selector|

http://git-wip-us.apache.org/repos/asf/curator/blob/a57be393/curator-x-rpc/src/site/confluence/usage.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/usage.confluence b/curator-x-rpc/src/site/confluence/usage.confluence
index 36fb04e..047ae5e 100644
--- a/curator-x-rpc/src/site/confluence/usage.confluence
+++ b/curator-x-rpc/src/site/confluence/usage.confluence
@@ -72,15 +72,16 @@ h2. Usage
 Once initialized, use recipes/APIs as needed. Here is an example of using the lock recipe:
 
 {code}
-optionalLockId = client.acquireLock(curatorProjection, "/mylock", 10000)
-if optionalLockId.lockProjection == null {
+optionalLock = client.acquireLock(curatorProjection, "/mylock", 10000)
+if optionalLock.lockProjection == null {
     // lock attempt failed. Throw exception, etc.
 }
-lockId = optionalLockId.lockProjection
+lockProjection = optionalLock.lockProjection
 
-// you now own the lock
-
-client.closeGenericProjection(curatorProjection, lockId.id)
+try
+    // you now own the lock
+finally
+    client.closeGenericProjection(curatorProjection, lockProjection.id)
 {code}
 
 Here is an example of using the path cache:


[15/18] git commit: AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch

Posted by ra...@apache.org.
AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5954e66f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5954e66f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5954e66f

Branch: refs/heads/master
Commit: 5954e66fa3108c39b3f2915583def5e51915846f
Parents: fab79a4
Author: randgalt <ra...@apache.org>
Authored: Tue Jun 17 14:38:14 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jun 17 14:38:14 2014 -0500

----------------------------------------------------------------------
 .../recipes/AfterConnectionEstablished.java     |  7 ++-
 .../framework/recipes/leader/LeaderLatch.java   | 51 ++++++++++++++------
 .../recipes/leader/TestLeaderLatch.java         |  2 +
 3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index 41ba702..65c6ace 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -23,6 +24,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /**
  * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
@@ -37,8 +39,9 @@ public class AfterConnectionEstablished
      *
      * @param client             The curator client
      * @param runAfterConnection The logic to run
+     * @return future of the task so it can be canceled, etc. if needed
      */
-    public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
+    public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
     {
         //Block until connected
         final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
@@ -62,7 +65,7 @@ public class AfterConnectionEstablished
                 }
             }
         };
-        executor.submit(internalCall);
+        return executor.submit(internalCall);
     }
 
     private AfterConnectionEstablished()

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 9d70645..6f7636a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -46,6 +46,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +70,7 @@ public class LeaderLatch implements Closeable
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
+    private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
 
     private final ConnectionStateListener listener = new ConnectionStateListener()
     {
@@ -155,22 +157,21 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        AfterConnectionEstablished.execute(client, new Runnable()
-            {
-                @Override
-                public void run()
+        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
                 {
-                    client.getConnectionStateListenable().addListener(listener);
-                    try
-                    {
-                        reset();
-                    }
-                    catch ( Exception e )
+                    @Override
+                    public void run()
                     {
-                        log.error("An error occurred checking resetting leadership.", e);
+                        try
+                        {
+                            internalStart();
+                        }
+                        finally
+                        {
+                            startTask.set(null);
+                        }
                     }
-                }
-            });
+                }));
     }
 
     /**
@@ -194,11 +195,17 @@ public class LeaderLatch implements Closeable
      * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
      * @throws IOException errors
      */
-    public void close(CloseMode closeMode) throws IOException
+    public synchronized void close(CloseMode closeMode) throws IOException
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
+        Future<?> localStartTask = startTask.getAndSet(null);
+        if ( localStartTask != null )
+        {
+            localStartTask.cancel(true);
+        }
+
         try
         {
             setNode(null);
@@ -485,6 +492,22 @@ public class LeaderLatch implements Closeable
         client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
     }
 
+    private synchronized void internalStart()
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.getConnectionStateListenable().addListener(listener);
+            try
+            {
+                reset();
+            }
+            catch ( Exception e )
+            {
+                log.error("An error occurred checking resetting leadership.", e);
+            }
+        }
+    }
+
     private void checkLeadership(List<String> children) throws Exception
     {
         final String localOurPath = ourPath.get();

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b97e708..f4fb1c7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -89,6 +89,8 @@ public class TestLeaderLatch extends BaseClassForTests
         try
         {
             client.start();
+            client.create().creatingParentsIfNeeded().forPath(PATH_NAME);
+
             LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
 
             latch.debugResetWaitLatch = new CountDownLatch(1);