You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/02/07 17:58:46 UTC

[curator] branch CURATOR-505 updated (7f1c0dd -> a162f5b)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git.


    from 7f1c0dd  CURATOR-505
     new 96f186a  CURATOR-505 - interim checking - refactoring, simplifications, more testing, and documentation
     new a162f5b  CURATOR-505 - decoration of ConnectionStateListeners is now automatic (a backdoor is provided)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/curator/framework/CuratorFramework.java |  10 --
 .../curator/framework/CuratorFrameworkFactory.java |   2 +-
 .../framework/imps/CuratorFrameworkImpl.java       |  18 ++--
 .../curator/framework/imps/EnsembleTracker.java    |   6 ++
 .../framework/listen/MappingListenerContainer.java | 112 +++++++++++++++++++++
 .../curator/framework/state/CircuitBreaker.java    |  24 ++---
 .../CircuitBreakingConnectionStateListener.java    |  11 +-
 .../framework/state/ConnectionStateListener.java   |  15 ++-
 .../state/ConnectionStateListenerDecorator.java    |   6 +-
 .../framework/state/ConnectionStateManager.java    |  35 ++++---
 .../framework/state/TestCircuitBreaker.java        |  42 ++++++--
 ...TestCircuitBreakingConnectionStateListener.java |  86 ++++++++++++++--
 .../curator/framework/recipes/cache/NodeCache.java |  56 +++++------
 .../framework/recipes/cache/PathChildrenCache.java |  10 +-
 .../curator/framework/recipes/cache/TreeCache.java |  11 +-
 .../framework/recipes/leader/LeaderLatch.java      |  11 +-
 .../framework/recipes/leader/LeaderSelector.java   |   8 +-
 .../framework/recipes/nodes/PersistentNode.java    |  21 ++--
 .../framework/recipes/shared/SharedValue.java      |  40 ++++----
 .../framework/recipes/leader/TestLeaderLatch.java  |  28 +++---
 .../x/discovery/details/ServiceCacheImpl.java      | 109 ++++++++++----------
 .../x/discovery/details/ServiceDiscoveryImpl.java  |  45 +++++----
 src/site/confluence/utilities.confluence           |  28 ++++++
 23 files changed, 493 insertions(+), 241 deletions(-)
 create mode 100644 curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java


[curator] 02/02: CURATOR-505 - decoration of ConnectionStateListeners is now automatic (a backdoor is provided)

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git

commit a162f5b4606277e87f93b93e47c32237490846e1
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Feb 7 12:58:41 2019 -0500

    CURATOR-505 - decoration of ConnectionStateListeners is now automatic (a backdoor is provided)
---
 .../apache/curator/framework/CuratorFramework.java |  20 ----
 .../framework/imps/CuratorFrameworkImpl.java       |  29 ++----
 .../curator/framework/imps/EnsembleTracker.java    |   6 ++
 .../framework/listen/MappingListenerContainer.java | 112 +++++++++++++++++++++
 .../framework/state/ConnectionStateListener.java   |  15 ++-
 .../framework/state/ConnectionStateManager.java    |  35 ++++---
 .../curator/framework/recipes/cache/NodeCache.java |  56 +++++------
 .../framework/recipes/cache/PathChildrenCache.java |  10 +-
 .../curator/framework/recipes/cache/TreeCache.java |  11 +-
 .../framework/recipes/leader/LeaderLatch.java      |  11 +-
 .../framework/recipes/leader/LeaderSelector.java   |   8 +-
 .../framework/recipes/nodes/PersistentNode.java    |  21 ++--
 .../framework/recipes/shared/SharedValue.java      |  40 ++++----
 .../x/discovery/details/ServiceCacheImpl.java      | 109 ++++++++++----------
 .../x/discovery/details/ServiceDiscoveryImpl.java  |  45 +++++----
 src/site/confluence/errors.confluence              |   3 +-
 src/site/confluence/utilities.confluence           |  18 +---
 17 files changed, 321 insertions(+), 228 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 2657781..3737faa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -30,7 +30,6 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -358,23 +357,4 @@ public interface CuratorFramework extends Closeable
      * @since 4.1.0
      */
     CompletableFuture<Void> runSafe(Runnable runnable);
-
-    /**
-     * Uses the configured {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}
-     * to decorate the given listener. You should always decorate connection state listeners via
-     * this method. See the Curator recipes for examples.
-     *
-     * @param actual listener to decorate
-     * @return decorated listener
-     */
-    ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual);
-
-    /**
-     * Returns a facade of the current instance that uses the given connection state listener
-     * decorator instead of the configured one
-     *
-     * @param newDecorator decorator to use
-     * @return facade
-     */
-    CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator);
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index f210021..d9c3424 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -41,7 +41,6 @@ import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.framework.state.ConnectionStateManager;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
@@ -91,7 +90,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final SchemaSet schemaSet;
     private final boolean zk34CompatibilityMode;
     private final Executor runSafeService;
-    private final ConnectionStateListenerDecorator connectionStateListenerDecorator;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -141,7 +139,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
+        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -149,7 +147,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
         schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
         zk34CompatibilityMode = builder.isZk34CompatibilityMode();
-        connectionStateListenerDecorator = builder.getConnectionStateListenerDecorator();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -236,11 +233,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
     {
-        this(parent, parent.connectionStateListenerDecorator);
-    }
-
-    private CuratorFrameworkImpl(CuratorFrameworkImpl parent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
-    {
         client = parent.client;
         listeners = parent.listeners;
         unhandledErrorListeners = parent.unhandledErrorListeners;
@@ -265,7 +257,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
-        this.connectionStateListenerDecorator = connectionStateListenerDecorator;
     }
 
     @Override
@@ -334,6 +325,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
                         logAsErrorConnectionErrors.set(true);
                     }
                 }
+
+                @Override
+                public boolean doNotDecorate()
+                {
+                    return true;
+                }
             };
 
             this.getConnectionStateListenable().addListener(listener);
@@ -598,18 +595,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return schemaSet;
     }
 
-    @Override
-    public ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual)
-    {
-        return connectionStateListenerDecorator.decorateListener(this, actual);
-    }
-
-    @Override
-    public CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator)
-    {
-        return new CuratorFrameworkImpl(this, newDecorator);
-    }
-
     ACLProvider getAclProvider()
     {
         return aclProvider;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index 7d8fe19..8ca63f6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -72,6 +72,12 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
                 }
             }
         }
+
+        @Override
+        public boolean doNotDecorate()
+        {
+            return true;
+        }
     };
 
     private enum State
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java
new file mode 100644
index 0000000..3a37ecb
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java
@@ -0,0 +1,112 @@
+package org.apache.curator.framework.listen;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
+ * doesn't leak Guava's internals and also supports mapping/wrapping of listeners
+ */
+public class MappingListenerContainer<K, V> implements Listenable<K>
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Map<K, ListenerEntry<V>> listeners = Maps.newConcurrentMap();
+    private final Function<K, V> mapper;
+
+    /**
+     * Returns a new standard version that does no mapping
+     *
+     * @return new container
+     */
+    public static <T> MappingListenerContainer<T, T> nonMapping()
+    {
+        return new MappingListenerContainer<>(Function.identity());
+    }
+
+    /**
+     * Returns a new container that wraps listeners using the given mapper
+     *
+     * @param mapper listener mapper/wrapper
+     * @return new container
+     */
+    public static <K, V> MappingListenerContainer<K, V> mapping(Function<K, V> mapper)
+    {
+        return new MappingListenerContainer<>(mapper);
+    }
+
+    @Override
+    public void addListener(K listener)
+    {
+        addListener(listener, MoreExecutors.directExecutor());
+    }
+
+    @Override
+    public void addListener(K listener, Executor executor)
+    {
+        V mapped = mapper.apply(listener);
+        listeners.put(listener, new ListenerEntry<V>(mapped, executor));
+    }
+
+    @Override
+    public void removeListener(K listener)
+    {
+        if ( listener != null )
+        {
+            listeners.remove(listener);
+        }
+    }
+
+    /**
+     * Remove all listeners
+     */
+    public void clear()
+    {
+        listeners.clear();
+    }
+
+    /**
+     * Return the number of listeners
+     *
+     * @return number
+     */
+    public int size()
+    {
+        return listeners.size();
+    }
+
+    /**
+     * Utility - apply the given function to each listener. The function receives
+     * the listener as an argument.
+     *
+     * @param function function to call for each listener
+     */
+    public void forEach(Consumer<V> function)
+    {
+        for ( ListenerEntry<V> entry : listeners.values() )
+        {
+            entry.executor.execute(() -> {
+                try
+                {
+                    function.accept(entry.listener);
+                }
+                catch ( Throwable e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
+                }
+            });
+        }
+    }
+
+    private MappingListenerContainer(Function<K, V> mapper)
+    {
+        this.mapper = mapper;
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java
index 075e6ec..71635d0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java
@@ -28,5 +28,18 @@ public interface ConnectionStateListener
      * @param client the client
      * @param newState the new state
      */
-    public void stateChanged(CuratorFramework client, ConnectionState newState);
+    void stateChanged(CuratorFramework client, ConnectionState newState);
+
+    /**
+     * Normally, ConnectionStateListeners are decorated via the configured
+     * {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}. For certain
+     * critical cases, however, this is not desired. If your listener returns <code>true</code>
+     * for doNotDecorate(), it will not be passed through the decorator.
+     *
+     * @return true/false
+     */
+    default boolean doNotDecorate()
+    {
+        return false;
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 5e28b3d..3654f61 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -19,10 +19,10 @@
 
 package org.apache.curator.framework.state;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.MappingListenerContainer;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
@@ -68,10 +68,10 @@ public class ConnectionStateManager implements Closeable
     private final CuratorFramework client;
     private final int sessionTimeoutMs;
     private final int sessionExpirationPercent;
-    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
     private final ExecutorService service;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final MappingListenerContainer<ConnectionStateListener, ConnectionStateListener> listeners;
 
     // guarded by sync
     private ConnectionState currentConnectionState;
@@ -93,6 +93,18 @@ public class ConnectionStateManager implements Closeable
      */
     public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
     {
+        this(client, threadFactory, sessionTimeoutMs, sessionExpirationPercent, ConnectionStateListenerDecorator.standard);
+    }
+
+    /**
+     * @param client        the client
+     * @param threadFactory thread factory to use or null for a default
+     * @param sessionTimeoutMs the ZK session timeout in milliseconds
+     * @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all
+     * @param connectionStateListenerDecorator the decorator to use
+     */
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
+    {
         this.client = client;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.sessionExpirationPercent = sessionExpirationPercent;
@@ -101,6 +113,7 @@ public class ConnectionStateManager implements Closeable
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
         }
         service = Executors.newSingleThreadExecutor(threadFactory);
+        listeners = MappingListenerContainer.mapping(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
     }
 
     /**
@@ -138,8 +151,9 @@ public class ConnectionStateManager implements Closeable
      * Return the listenable
      *
      * @return listenable
+     * @since 4.2.0 return type has changed from ListenerContainer to Listenable
      */
-    public ListenerContainer<ConnectionStateListener> getListenable()
+    public Listenable<ConnectionStateListener> getListenable()
     {
         return listeners;
     }
@@ -263,18 +277,7 @@ public class ConnectionStateManager implements Closeable
                         log.warn("There are no ConnectionStateListeners registered.");
                     }
 
-                    listeners.forEach
-                        (
-                            new Function<ConnectionStateListener, Void>()
-                            {
-                                @Override
-                                public Void apply(ConnectionStateListener listener)
-                                {
-                                    listener.stateChanged(client, newState);
-                                    return null;
-                                }
-                            }
-                        );
+                    listeners.forEach(listener -> listener.stateChanged(client, newState));
                 }
                 else if ( sessionExpirationPercent > 0 )
                 {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 1ba88c3..9687e1b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -64,7 +64,32 @@ public class NodeCache implements Closeable
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
     private final AtomicBoolean isConnected = new AtomicBoolean(true);
-    private volatile ConnectionStateListener connectionStateListener;
+    private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+            {
+                if ( isConnected.compareAndSet(false, true) )
+                {
+                    try
+                    {
+                        reset();
+                    }
+                    catch ( Exception e )
+                    {
+                        ThreadUtils.checkInterrupted(e);
+                        log.error("Trying to reset after reconnection", e);
+                    }
+                }
+            }
+            else
+            {
+                isConnected.set(false);
+            }
+        }
+    };
 
     private Watcher watcher = new Watcher()
     {
@@ -118,8 +143,6 @@ public class NodeCache implements Closeable
         this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
         this.dataIsCompressed = dataIsCompressed;
-
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     public CuratorFramework getClient()
@@ -173,7 +196,7 @@ public class NodeCache implements Closeable
             // has something to do with Guava's cache and circular references
             connectionStateListener = null;
             watcher = null;
-        }
+        }        
     }
 
     /**
@@ -325,7 +348,7 @@ public class NodeCache implements Closeable
             }
         }
     }
-
+    
     /**
      * Default behavior is just to log the exception
      *
@@ -335,27 +358,4 @@ public class NodeCache implements Closeable
     {
         log.error("", e);
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
-        {
-            if ( isConnected.compareAndSet(false, true) )
-            {
-                try
-                {
-                    reset();
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    log.error("Trying to reset after reconnection", e);
-                }
-            }
-        }
-        else
-        {
-            isConnected.set(false);
-        }
-    }
 }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 14608ba..bdc73cc 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -128,7 +128,14 @@ public class PathChildrenCache implements Closeable
     @VisibleForTesting
     volatile Exchanger<Object> rebuildTestExchanger;
 
-    private volatile ConnectionStateListener connectionStateListener;
+    private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            handleStateChange(newState);
+        }
+    };
     public static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
 
     /**
@@ -218,7 +225,6 @@ public class PathChildrenCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.executorService = executorService;
         ensureContainers = new EnsureContainers(client, path);
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index a3b1d23..f42c1d5 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -534,7 +534,15 @@ public class TreeCache implements Closeable
     private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
     private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>();
     private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
-    private final ConnectionStateListener connectionStateListener;
+
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            handleStateChange(newState);
+        }
+    };
 
     static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache");
 
@@ -578,7 +586,6 @@ public class TreeCache implements Closeable
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
         this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 22cf3af..bb8aa73 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -74,7 +74,15 @@ public class LeaderLatch implements Closeable
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
-    private final ConnectionStateListener listener;
+
+    private final ConnectionStateListener listener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            handleStateChange(newState);
+        }
+    };
 
     private static final String LOCK_NAME = "latch-";
 
@@ -141,7 +149,6 @@ public class LeaderLatch implements Closeable
         this.latchPath = PathUtils.validatePath(latchPath);
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
-        listener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index 108d66e..0bb448a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
@@ -69,7 +68,6 @@ public class LeaderSelector implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final LeaderSelectorListener listener;
-    private final ConnectionStateListener connectionStateListener;
     private final CloseableExecutorService executorService;
     private final InterProcessMutex mutex;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -147,8 +145,6 @@ public class LeaderSelector implements Closeable
         this.listener = new WrappedListener(this, listener);
         hasLeadership = false;
 
-        connectionStateListener = client.decorateConnectionStateListener(listener);
-
         this.executorService = executorService;
         mutex = new InterProcessMutex(client, leaderPath)
         {
@@ -219,7 +215,7 @@ public class LeaderSelector implements Closeable
         Preconditions.checkState(!executorService.isShutdown(), "Already started");
         Preconditions.checkState(!hasLeadership, "Already has leadership");
 
-        client.getConnectionStateListenable().addListener(connectionStateListener);
+        client.getConnectionStateListenable().addListener(listener);
         requeue();
     }
 
@@ -275,7 +271,7 @@ public class LeaderSelector implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
 
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
+        client.getConnectionStateListenable().removeListener(listener);
         executorService.close();
         ourTask.set(null);
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 293f46e..81e8dd9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -145,7 +145,17 @@ public class PersistentNode implements Closeable
             }
         }
     };
-    private final ConnectionStateListener connectionStateListener;
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework dummy, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.RECONNECTED) && isActive() )
+            {
+                createNode();
+            }
+        }
+    };
 
     @VisibleForTesting
     volatile CountDownLatch debugCreateNodeLatch = null;
@@ -203,7 +213,6 @@ public class PersistentNode implements Closeable
         };
 
         this.data.set(Arrays.copyOf(data, data.length));
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     private void processBackgroundCallbackClosedState(CuratorEvent event)
@@ -545,12 +554,4 @@ public class PersistentNode implements Closeable
     {
         return authFailure.get();
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        if ( (newState == ConnectionState.RECONNECTED) && isActive() )
-        {
-            createNode();
-        }
-    }
 }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 5f3e183..5d7abce 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -73,7 +73,26 @@ public class SharedValue implements Closeable, SharedValueReader
         }
     };
 
-    private final ConnectionStateListener connectionStateListener;
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            notifyListenerOfStateChanged(newState);
+            if ( newState.isConnected() )
+            {
+                try
+                {
+                    readValueAndNotifyListenersInBackground();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Could not read value after reconnect", e);
+                }
+            }
+        }
+    };
 
     private enum State
     {
@@ -94,7 +113,6 @@ public class SharedValue implements Closeable, SharedValueReader
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         this.watcher = new SharedValueCuratorWatcher();
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     @VisibleForTesting
@@ -106,7 +124,6 @@ public class SharedValue implements Closeable, SharedValueReader
         // inject watcher for testing
         this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     @Override
@@ -317,21 +334,4 @@ public class SharedValue implements Closeable, SharedValueReader
                 }
             );
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        notifyListenerOfStateChanged(newState);
-        if ( newState.isConnected() )
-        {
-            try
-            {
-                readValueAndNotifyListenersInBackground();
-            }
-            catch ( Exception e )
-            {
-                ThreadUtils.checkInterrupted(e);
-                log.error("Could not read value after reconnect", e);
-            }
-        }
-    }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 4270116..d1a31ad 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -24,15 +23,14 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -47,16 +45,17 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T> discovery;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache cache;
-    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
-    private final ConcurrentMap<ServiceCacheListener, ConnectionStateListener> connectionStateListeners = Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T>                           discovery;
+    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
+    private final PathChildrenCache                                 cache;
+    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
 
     private enum State
     {
-        LATENT, STARTED, STOPPED
+        LATENT,
+        STARTED,
+        STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -124,15 +123,18 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
-        {
-            @Override
-            public Void apply(ServiceCacheListener listener)
-            {
-                discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener));
-                return null;
-            }
-        });
+        listenerContainer.forEach
+            (
+                new Function<ServiceCacheListener, Void>()
+                {
+                    @Override
+                    public Void apply(ServiceCacheListener listener)
+                    {
+                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                        return null;
+                    }
+                }
+            );
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -144,56 +146,59 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     public void addListener(ServiceCacheListener listener)
     {
         listenerContainer.addListener(listener);
-        discovery.getClient().getConnectionStateListenable().addListener(wrap(listener));
+        discovery.getClient().getConnectionStateListenable().addListener(listener);
     }
 
     @Override
     public void addListener(ServiceCacheListener listener, Executor executor)
     {
         listenerContainer.addListener(listener, executor);
-        discovery.getClient().getConnectionStateListenable().addListener(wrap(listener), executor);
+        discovery.getClient().getConnectionStateListenable().addListener(listener, executor);
     }
 
     @Override
     public void removeListener(ServiceCacheListener listener)
     {
         listenerContainer.removeListener(listener);
-        discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener));
+        discovery.getClient().getConnectionStateListenable().removeListener(listener);
     }
 
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean notifyListeners = false;
+        boolean         notifyListeners = false;
         switch ( event.getType() )
         {
-        case CHILD_ADDED:
-        case CHILD_UPDATED:
-        {
-            addInstance(event.getData(), false);
-            notifyListeners = true;
-            break;
-        }
+            case CHILD_ADDED:
+            case CHILD_UPDATED:
+            {
+                addInstance(event.getData(), false);
+                notifyListeners = true;
+                break;
+            }
 
-        case CHILD_REMOVED:
-        {
-            instances.remove(instanceIdFromData(event.getData()));
-            notifyListeners = true;
-            break;
-        }
+            case CHILD_REMOVED:
+            {
+                instances.remove(instanceIdFromData(event.getData()));
+                notifyListeners = true;
+                break;
+            }
         }
 
         if ( notifyListeners )
         {
-            listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
-            {
-                @Override
-                public Void apply(ServiceCacheListener listener)
+            listenerContainer.forEach
+            (
+                new Function<ServiceCacheListener, Void>()
                 {
-                    listener.cacheChanged();
-                    return null;
+                    @Override
+                    public Void apply(ServiceCacheListener listener)
+                    {
+                        listener.cacheChanged();
+                        return null;
+                    }
                 }
-            });
+            );
         }
     }
 
@@ -204,8 +209,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
 
     private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
     {
-        String instanceId = instanceIdFromData(childData);
-        ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+        String                  instanceId = instanceIdFromData(childData);
+        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
         if ( onlyIfAbsent )
         {
             instances.putIfAbsent(instanceId, serviceInstance);
@@ -216,16 +221,4 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         }
         cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
     }
-
-    private ConnectionStateListener wrap(ServiceCacheListener listener)
-    {
-        ConnectionStateListener wrapped = discovery.getClient().decorateConnectionStateListener(listener);
-        connectionStateListeners.put(listener, wrapped);
-        return wrapped;
-    }
-
-    private ConnectionStateListener unwrap(ServiceCacheListener listener)
-    {
-        return connectionStateListeners.remove(listener);
-    }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 2e10095..476705c 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -65,7 +65,29 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
     private final boolean watchInstances;
-    private final ConnectionStateListener connectionStateListener;
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) )
+            {
+                try
+                {
+                    log.debug("Re-registering due to reconnection");
+                    reRegisterServices();
+                }
+                catch (InterruptedException ex)
+                {
+                    Thread.currentThread().interrupt();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not re-register instances after reconnection", e);
+                }
+            }
+        }
+    };
 
     private static class Entry<T>
     {
@@ -97,7 +119,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             entry.cache = makeNodeCache(thisInstance);
             services.put(thisInstance.getId(), entry);
         }
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
@@ -509,24 +530,4 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             }
         }
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) )
-        {
-            try
-            {
-                log.debug("Re-registering due to reconnection");
-                reRegisterServices();
-            }
-            catch (InterruptedException ex)
-            {
-                Thread.currentThread().interrupt();
-            }
-            catch ( Exception e )
-            {
-                log.error("Could not re-register instances after reconnection", e);
-            }
-        }
-    }
 }
diff --git a/src/site/confluence/errors.confluence b/src/site/confluence/errors.confluence
index 97f23fd..b4f6643 100644
--- a/src/site/confluence/errors.confluence
+++ b/src/site/confluence/errors.confluence
@@ -19,8 +19,7 @@ in a retry mechanism. Thus, the following guarantees can be made:
 h2. Notifications
 Curator exposes several listenable interfaces for clients to monitor the state of the ZooKeeper connection.
 
-{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details on properly decorating listeners) is called when there are connection
-disruptions. Clients can monitor these changes and take
+{{ConnectionStateListener}} is called when there are connection disruptions. Clients can monitor these changes and take
 appropriate action. These are the possible state changes:
 
 |CONNECTED|Sent for the first successful connection to the server. NOTE: You will only get one of these messages for any CuratorFramework instance.|
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 1971c3c..720d8d9 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -31,8 +31,7 @@ If the connection has not been restored, the RetryPolicy is checked again. If th
 the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is
 forwarded to the managed listener.
 
-You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. All Curator recipes will decorate
-their ConnectionStateListeners using the configured decorator. E.g.
+You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. E.g.
 
 {code}
 ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...);
@@ -43,21 +42,6 @@ CuratorFramework client = CuratorFrameworkFactory.builder()
     .build();
 {code}
 
-If you are setting a ConnectionStateListener you should always "decorate" it by calling {{decorateConnectionStateListener()}}.
-
-{code}
-CuratorFramework client ...
-ConnectionStateListener listener = ...
-ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
-
-...
-
-client.getConnectionStateListenable().addListener(decoratedListener);
-
-// later, to remove...
-client.getConnectionStateListenable().removeListener(decoratedListener);
-{code}
-
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer:


[curator] 01/02: CURATOR-505 - interim checking - refactoring, simplifications, more testing, and documentation

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 96f186abc3e4977eaac580054bd8a3cdfa79d22a
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Feb 7 12:10:52 2019 -0500

    CURATOR-505 - interim checking - refactoring, simplifications, more testing, and documentation
---
 .../apache/curator/framework/CuratorFramework.java | 10 +++
 .../curator/framework/CuratorFrameworkFactory.java |  2 +-
 .../framework/imps/CuratorFrameworkImpl.java       | 13 +++-
 .../curator/framework/state/CircuitBreaker.java    | 24 +++---
 .../CircuitBreakingConnectionStateListener.java    | 11 ++-
 .../state/ConnectionStateListenerDecorator.java    |  6 +-
 .../framework/state/TestCircuitBreaker.java        | 42 ++++++++---
 ...TestCircuitBreakingConnectionStateListener.java | 86 ++++++++++++++++++++--
 .../framework/recipes/leader/TestLeaderLatch.java  | 28 +++----
 src/site/confluence/errors.confluence              |  3 +-
 src/site/confluence/utilities.confluence           | 44 +++++++++++
 11 files changed, 214 insertions(+), 55 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 6513716..2657781 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -367,4 +368,13 @@ public interface CuratorFramework extends Closeable
      * @return decorated listener
      */
     ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual);
+
+    /**
+     * Returns a facade of the current instance that uses the given connection state listener
+     * decorator instead of the configured one
+     *
+     * @param newDecorator decorator to use
+     * @return facade
+     */
+    CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator);
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 0cc6e0d..283a093 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -508,7 +508,7 @@ public class CuratorFrameworkFactory
          * @return this
          * @since 4.2.0
          */
-        public Builder connectionStateListenerFactory(ConnectionStateListenerDecorator connectionStateListenerDecorator)
+        public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator)
         {
             this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null");
             return this;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 81cae74..f210021 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -236,6 +236,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
     {
+        this(parent, parent.connectionStateListenerDecorator);
+    }
+
+    private CuratorFrameworkImpl(CuratorFrameworkImpl parent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
+    {
         client = parent.client;
         listeners = parent.listeners;
         unhandledErrorListeners = parent.unhandledErrorListeners;
@@ -260,7 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
-        connectionStateListenerDecorator = parent.connectionStateListenerDecorator;
+        this.connectionStateListenerDecorator = connectionStateListenerDecorator;
     }
 
     @Override
@@ -599,6 +604,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return connectionStateListenerDecorator.decorateListener(this, actual);
     }
 
+    @Override
+    public CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator)
+    {
+        return new CuratorFrameworkImpl(this, newDecorator);
+    }
+
     ACLProvider getAclProvider()
     {
         return aclProvider;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
index ad48a15..504edbc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
@@ -14,7 +14,7 @@ class CircuitBreaker
 
     private boolean isOpen = false;
     private int retryCount = 0;
-    private long openStartNanos = 0;
+    private long startNanos = 0;
 
     CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
     {
@@ -41,13 +41,13 @@ class CircuitBreaker
 
         isOpen = true;
         retryCount = 0;
-        openStartNanos = System.nanoTime();
-        if ( !tryToRetry(completion) )
+        startNanos = System.nanoTime();
+        if ( tryToRetry(completion) )
         {
-            close();
-            return false;
+            return true;
         }
-        return true;
+        close();
+        return false;
     }
 
     boolean tryToRetry(Runnable completion)
@@ -59,13 +59,13 @@ class CircuitBreaker
 
         long[] sleepTimeNanos = new long[]{0L};
         RetrySleeper retrySleeper = (time, unit) -> sleepTimeNanos[0] = unit.toNanos(time);
-        if ( !retryPolicy.allowRetry(retryCount, System.nanoTime() - openStartNanos, retrySleeper) )
+        if ( retryPolicy.allowRetry(retryCount, System.nanoTime() - startNanos, retrySleeper) )
         {
-            return false;
+            ++retryCount;
+            service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
+            return true;
         }
-        ++retryCount;
-        service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
-        return true;
+        return false;
     }
 
     boolean close()
@@ -73,7 +73,7 @@ class CircuitBreaker
         boolean wasOpen = isOpen;
         retryCount = 0;
         isOpen = false;
-        openStartNanos = 0;
+        startNanos = 0;
         return wasOpen;
     }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
index 12efad9..dba651a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
@@ -13,7 +13,7 @@ import java.util.concurrent.ScheduledExecutorService;
  *     A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network
  *     outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession.
  *     Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete
- *     its lock node and try to recreated it in order to try to re-obtain leadership, etc.
+ *     its lock node and try to recreate it in order to try to re-obtain leadership, etc.
  * </p>
  *
  * <p>
@@ -114,7 +114,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
                 log.debug("Could not open circuit breaker. State: {}", newState);
             }
         }
-        callListener(circuitInitialState);
+        callListener(newState);
     }
 
     private synchronized void handleOpenStateChange(ConnectionState newState)
@@ -126,11 +126,10 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
         }
         else
         {
-            circuitLostHasBeenSent = true;
-            circuitInitialState = ConnectionState.LOST;
-            circuitLastState = newState;
             log.debug("Circuit is open. State changed to LOST. Sending to listener.");
-            callListener(circuitInitialState);
+            circuitLostHasBeenSent = true;
+            circuitLastState = circuitInitialState = ConnectionState.LOST;
+            callListener(ConnectionState.LOST);
         }
     }
 
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
index 0f11c46..0ac808b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
@@ -17,14 +17,14 @@ import java.util.concurrent.ScheduledExecutorService;
  * <code><pre>
  * CuratorFramework client ...
  * ConnectionStateListener listener = ...
- * ConnectionStateListener wrappedListener = client.wrapConnectionStateListener(listener);
+ * ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
  *
  * ...
  *
- * client.getConnectionStateListenable().addListener(wrappedListener);
+ * client.getConnectionStateListenable().addListener(decoratedListener);
  *
  * // later, to remove...
- * client.getConnectionStateListenable().removeListener(wrappedListener);
+ * client.getConnectionStateListenable().removeListener(decoratedListener);
  * </pre></code>
  * </p>
  */
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
index 77ec20f..e2daa96 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
@@ -1,7 +1,9 @@
 package org.apache.curator.framework.state;
 
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
@@ -12,23 +14,30 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestCircuitBreaker
 {
+    private Duration[] lastDelay = new Duration[]{Duration.ZERO};
+    private ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1)
+    {
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+        {
+            lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
+            command.run();
+            return null;
+        }
+    };
+
+    @AfterClass
+    public void tearDown()
+    {
+        service.shutdownNow();
+    }
+
     @Test
     public void testBasic()
     {
         final int retryQty = 1;
         final Duration delay = Duration.ofSeconds(10);
 
-        Duration[] lastDelay = new Duration[]{Duration.ZERO};
-        ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1)
-        {
-            @Override
-            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
-            {
-                lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
-                command.run();
-                return null;
-            }
-        };
         CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service);
         AtomicInteger counter = new AtomicInteger(0);
 
@@ -46,4 +55,15 @@ public class TestCircuitBreaker
         Assert.assertEquals(circuitBreaker.getRetryCount(), 0);
         Assert.assertFalse(circuitBreaker.close());
     }
+
+    @Test
+    public void testVariousOpenRetryFails()
+    {
+        CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryForever(1), service);
+        Assert.assertFalse(circuitBreaker.tryToRetry(() -> {}));
+        Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
+        Assert.assertFalse(circuitBreaker.tryToOpen(() -> {}));
+        Assert.assertTrue(circuitBreaker.close());
+        Assert.assertFalse(circuitBreaker.close());
+    }
 }
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
index 36a1954..1712eed 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
@@ -8,7 +8,8 @@ import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.Timing2;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -20,7 +21,7 @@ public class TestCircuitBreakingConnectionStateListener
     private final CuratorFramework dummyClient = CuratorFrameworkFactory.newClient("foo", new RetryOneTime(1));
     private final Timing2 timing = new Timing2();
     private final Timing2 retryTiming = timing.multiple(.25);
-    private final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1);
+    private volatile ScheduledThreadPoolExecutor service;
 
     private static class RecordingListener implements ConnectionStateListener
     {
@@ -49,7 +50,13 @@ public class TestCircuitBreakingConnectionStateListener
         }
     }
 
-    @AfterClass
+    @BeforeMethod
+    public void setup()
+    {
+        service = new ScheduledThreadPoolExecutor(1);
+    }
+
+    @AfterMethod
     public void tearDown()
     {
         service.shutdownNow();
@@ -60,7 +67,7 @@ public class TestCircuitBreakingConnectionStateListener
     {
         RecordingListener recordingListener = new RecordingListener();
         TestRetryPolicy retryPolicy = new TestRetryPolicy();
-        final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
 
         listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
@@ -94,8 +101,11 @@ public class TestCircuitBreakingConnectionStateListener
         TestRetryPolicy retryPolicy = new TestRetryPolicy();
         CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
 
-        listener.stateChanged(dummyClient, ConnectionState.LOST);
-        listener.stateChanged(dummyClient, ConnectionState.LOST);   // second LOST ignored
+        synchronized(listener)  // don't let retry policy run while we're pushing state changes
+        {
+            listener.stateChanged(dummyClient, ConnectionState.LOST);
+            listener.stateChanged(dummyClient, ConnectionState.LOST);   // second LOST ignored
+        }
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
         Assert.assertTrue(recordingListener.stateChanges.isEmpty());
 
@@ -112,6 +122,7 @@ public class TestCircuitBreakingConnectionStateListener
 
         listener.stateChanged(dummyClient, ConnectionState.LOST);
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        Assert.assertFalse(listener.isOpen());
         listener.stateChanged(dummyClient, ConnectionState.LOST);
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
         Assert.assertFalse(listener.isOpen());
@@ -122,7 +133,7 @@ public class TestCircuitBreakingConnectionStateListener
     {
         RecordingListener recordingListener = new RecordingListener();
         RetryPolicy retryOnce = new RetryOneTime(retryTiming.milliseconds());
-        final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service);
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service);
 
         synchronized(listener)  // don't let retry policy run while we're pushing state changes
         {
@@ -134,4 +145,65 @@ public class TestCircuitBreakingConnectionStateListener
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
         Assert.assertFalse(listener.isOpen());
     }
+
+    @Test
+    public void testSuspendedToLostRatcheting() throws Exception
+    {
+        RecordingListener recordingListener = new RecordingListener();
+        RetryPolicy retryInfinite = new RetryForever(Integer.MAX_VALUE);
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryInfinite, service);
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        Assert.assertFalse(listener.isOpen());
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
+
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(listener.isOpen());
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+    }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 439f6c8..bf2abd4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -64,18 +64,18 @@ public class TestLeaderLatch extends BaseClassForTests
     public void testWithCircuitBreaker() throws Exception
     {
         Timing2 timing = new Timing2();
-        ConnectionStateListenerDecorator factory = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.milliseconds()));
+        ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
         try ( CuratorFramework client = CuratorFrameworkFactory.builder()
             .connectString(server.getConnectString())
             .retryPolicy(new RetryOneTime(1))
-            .connectionStateListenerFactory(factory)
+            .connectionStateListenerDecorator(decorator)
             .connectionTimeoutMs(timing.connection())
             .sessionTimeoutMs(timing.session())
             .build() )
         {
             client.start();
             AtomicInteger resetCount = new AtomicInteger(0);
-            LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
+            try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
             {
                 @Override
                 void reset() throws Exception
@@ -83,18 +83,20 @@ public class TestLeaderLatch extends BaseClassForTests
                     resetCount.incrementAndGet();
                     super.reset();
                 }
-            };
-            latch.start();
-            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
-
-            for ( int i = 0; i < 5; ++i )
+            } )
             {
-                server.stop();
-                server.restart();
-                timing.sleepABit();
+                latch.start();
+                Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+                for ( int i = 0; i < 5; ++i )
+                {
+                    server.stop();
+                    server.restart();
+                    timing.sleepABit();
+                }
+                Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+                Assert.assertEquals(resetCount.get(), 2);
             }
-            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
-            Assert.assertEquals(resetCount.get(), 2);
         }
     }
 
diff --git a/src/site/confluence/errors.confluence b/src/site/confluence/errors.confluence
index b4f6643..97f23fd 100644
--- a/src/site/confluence/errors.confluence
+++ b/src/site/confluence/errors.confluence
@@ -19,7 +19,8 @@ in a retry mechanism. Thus, the following guarantees can be made:
 h2. Notifications
 Curator exposes several listenable interfaces for clients to monitor the state of the ZooKeeper connection.
 
-{{ConnectionStateListener}} is called when there are connection disruptions. Clients can monitor these changes and take
+{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details on properly decorating listeners) is called when there are connection
+disruptions. Clients can monitor these changes and take
 appropriate action. These are the possible state changes:
 
 |CONNECTED|Sent for the first successful connection to the server. NOTE: You will only get one of these messages for any CuratorFramework instance.|
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 3a62fa5..1971c3c 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -14,6 +14,50 @@ Various static methods to help with using ZooKeeper ZNode paths:
 * getSortedChildren: Return the children of the given path sorted by sequence number
 * makePath: Given a parent path and a child node, create a combined full path
 
+h2. Circuit Breaking ConnectionStateListener
+
+During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these
+messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreate it in order to try to re\-obtain leadership, etc.
+
+This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open"
+(based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
+goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent.
+
+When the circuit decorator is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the
+circuit becomes open. The state change that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to get a delay amount.
+While the delay is active, the decorator will store state changes but will not forward them to the managed listener (except, however, the first time the state
+changes from SUSPENDED to LOST). When the delay elapses, if the connection has been restored, the circuit closes and forwards the new state to the managed listener.
+If the connection has not been restored, the RetryPolicy is checked again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however,
+the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is
+forwarded to the managed listener.
+
+You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. All Curator recipes will decorate
+their ConnectionStateListeners using the configured decorator. E.g.
+
+{code}
+ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...);
+CuratorFramework client = CuratorFrameworkFactory.builder()
+    ...
+    .connectionStateListenerDecorator(decorator)
+    ...
+    .build();
+{code}
+
+If you are setting a ConnectionStateListener you should always "decorate" it by calling {{decorateConnectionStateListener()}}.
+
+{code}
+CuratorFramework client ...
+ConnectionStateListener listener = ...
+ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
+
+...
+
+client.getConnectionStateListenable().addListener(decoratedListener);
+
+// later, to remove...
+client.getConnectionStateListenable().removeListener(decoratedListener);
+{code}
+
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer: