You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/08/12 14:34:14 UTC

[curator] 01/01: CURATOR-533

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-505-improve-circuit-breaker-to-shared
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 066ff40e2e99fadffb1a95be72c934369e3c1011
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Jul 28 01:01:13 2019 -0500

    CURATOR-533
    
    CURATOR-505 introduced circuit breaking behavior via CircuitBreakingConnectionStateListener and ConnectionStateListenerDecorator. Elastic has been using it to success but reports that the implementation can be improved. The existing implementation uses a new CircuitBreaker for each ConnectionStateListener set in a Curator client. It turns out that this is not ideal. Instead, a shared CircuitBreaker should be used per Curator client.
    
    Unfortunately, the best way to do this is to remove the ConnectionStateListenerDecorator semantics and use a different mechanism. This Issue proposes to do this and remove ConnectionStateListenerDecorator. This is a breaking change but given the short amount of time it's been in Curator it's unlikely that it's been widely adopted.
    
    In this commit, ConnectionStateListenerDecorator is removed in favor of ConnectionStateListenerManagerFactory. ConnectionStateManager uses this factory to create the container to hold registered ConnectionStateListeners. A new CircuitBreakerManager now manages the circuit breaking behavior using a shared CircuitBreaker.
---
 .../curator/framework/CuratorFrameworkFactory.java |  20 ++--
 .../framework/imps/CuratorFrameworkImpl.java       |   4 +-
 .../curator/framework/imps/EnsembleTracker.java    |   2 +-
 .../curator/framework/listen/ListenerManager.java  |   5 +
 .../framework/listen/StandardListenerManager.java  |   2 +-
 ...tenerManager.java => UnaryListenerManager.java} |  26 +-----
 .../curator/framework/state/CircuitBreaker.java    |  20 +++-
 .../CircuitBreakingConnectionStateListener.java    |  37 ++++++--
 .../framework/state/CircuitBreakingManager.java    |  91 ++++++++++++++++++
 .../framework/state/ConnectionStateListener.java   |  10 +-
 .../state/ConnectionStateListenerDecorator.java    |  81 ----------------
 .../ConnectionStateListenerManagerFactory.java     |  69 ++++++++++++++
 .../framework/state/ConnectionStateManager.java    |  14 +--
 .../framework/state/TestCircuitBreaker.java        |   6 +-
 .../framework/recipes/leader/LeaderLatch.java      |   3 +-
 .../framework/recipes/leader/TestLeaderLatch.java  | 104 ++++++++++++++++-----
 src/site/confluence/utilities.confluence           |  16 ++--
 17 files changed, 335 insertions(+), 175 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index a5c08ff..887a2aa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -36,7 +36,7 @@ import org.apache.curator.framework.imps.GzipCompressionProvider;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
-import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
+import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
 import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
@@ -153,7 +153,7 @@ public class CuratorFrameworkFactory
         private boolean zk34CompatibilityMode = isZK34();
         private int waitForShutdownTimeoutMs = 0;
         private Executor runSafeService = null;
-        private ConnectionStateListenerDecorator connectionStateListenerDecorator = ConnectionStateListenerDecorator.standard;
+        private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -498,18 +498,16 @@ public class CuratorFrameworkFactory
         }
 
         /**
-         * Sets the connection state listener decorator. For example,
-         * you can set {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
-         * via this mechanism by using {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy)}
-         * or {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)}
+         * Sets the connection state listener manager factory. For example,
+         * you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
          *
-         * @param connectionStateListenerDecorator decorator to use
+         * @param connectionStateListenerManagerFactory manager factory to use
          * @return this
          * @since 4.2.0
          */
-        public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator)
+        public Builder connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory)
         {
-            this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null");
+            this.connectionStateListenerManagerFactory = Objects.requireNonNull(connectionStateListenerManagerFactory, "connectionStateListenerManagerFactory cannot be null");
             return this;
         }
 
@@ -660,9 +658,9 @@ public class CuratorFrameworkFactory
             return canBeReadOnly;
         }
 
-        public ConnectionStateListenerDecorator getConnectionStateListenerDecorator()
+        public ConnectionStateListenerManagerFactory getConnectionStateListenerManagerFactory()
         {
-            return connectionStateListenerDecorator;
+            return connectionStateListenerManagerFactory;
         }
 
         private Builder()
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index d9c3424..e003bf0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -139,7 +139,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
+        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -327,7 +327,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 }
 
                 @Override
-                public boolean doNotDecorate()
+                public boolean doNotProxy()
                 {
                     return true;
                 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index 8ca63f6..b2c55f6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -74,7 +74,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
         }
 
         @Override
-        public boolean doNotDecorate()
+        public boolean doNotProxy()
         {
             return true;
         }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
index cab0426..85bc8f9 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
@@ -41,4 +41,9 @@ public interface ListenerManager<K, V> extends Listenable<K>
      * @param function function to call for each listener
      */
     void forEach(Consumer<V> function);
+
+    default boolean isEmpty()
+    {
+        return size() == 0;
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
index e07fe47..8213967 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -26,7 +26,7 @@ import java.util.function.UnaryOperator;
 /**
  * Non mapping version of a listener container
  */
-public class StandardListenerManager<T> implements ListenerManager<T, T>
+public class StandardListenerManager<T> implements UnaryListenerManager<T>
 {
     private final ListenerManager<T, T> container;
 
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java
similarity index 63%
copy from curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
copy to curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java
index cab0426..54497f4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java
@@ -18,27 +18,9 @@
  */
 package org.apache.curator.framework.listen;
 
-import java.util.function.Consumer;
-
-public interface ListenerManager<K, V> extends Listenable<K>
+/**
+ * A {@link ListenerManager} that doesn't do any mapping
+ */
+public interface UnaryListenerManager<T> extends ListenerManager<T, T>
 {
-    /**
-     * Remove all listeners
-     */
-    void clear();
-
-    /**
-     * Return the number of listeners
-     *
-     * @return number
-     */
-    int size();
-
-    /**
-     * Utility - apply the given function to each listener. The function receives
-     * the listener as an argument.
-     *
-     * @param function function to call for each listener
-     */
-    void forEach(Consumer<V> function);
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
index 03e44f8..c207128 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
@@ -20,12 +20,12 @@ package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.RetrySleeper;
+import org.apache.curator.utils.ThreadUtils;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-// must be guarded by sync
 class CircuitBreaker
 {
     private final RetryPolicy retryPolicy;
@@ -35,12 +35,18 @@ class CircuitBreaker
     private int retryCount = 0;
     private long startNanos = 0;
 
-    CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
+    static CircuitBreaker build(RetryPolicy retryPolicy)
     {
-        this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null");
-        this.service = Objects.requireNonNull(service, "service cannot be null");
+        return new CircuitBreaker(retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
+    }
+
+    static CircuitBreaker build(RetryPolicy retryPolicy, ScheduledExecutorService service)
+    {
+        return new CircuitBreaker(retryPolicy, service);
     }
 
+    // IMPORTANT - all methods below MUST be guarded by synchronization
+
     boolean isOpen()
     {
         return isOpen;
@@ -96,4 +102,10 @@ class CircuitBreaker
         startNanos = 0;
         return wasOpen;
     }
+
+    private CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
+    {
+        this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null");
+        this.service = Objects.requireNonNull(service, "service cannot be null");
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
index 24eba01..beec66f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Objects;
@@ -28,14 +27,14 @@ import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * <p>
- *     A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network
+ *     A proxy for connection state listeners that adds circuit breaking behavior. During network
  *     outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession.
  *     Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete
  *     its lock node and try to recreate it in order to try to re-obtain leadership, etc.
  * </p>
  *
  * <p>
- *     This noisy herding can be avoided by using the circuit breaking listener decorator. When it
+ *     This noisy herding can be avoided by using the circuit breaking listener. When it
  *     receives {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED}, the circuit
  *     becomes "open" (based on the provided {@link org.apache.curator.RetryPolicy}) and will ignore
  *     future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
@@ -44,10 +43,10 @@ import java.util.concurrent.ScheduledExecutorService;
  * </p>
  *
  * <p>
- *     When the circuit decorator is closed, all connection state changes are forwarded to the managed
+ *     When the circuit is closed, all connection state changes are forwarded to the managed
  *     listener. When the first disconnected state is received, the circuit becomes open. The state change
  *     that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to
- *     get a delay amount. While the delay is active, the decorator will store state changes but will not
+ *     get a delay amount. While the delay is active, the circuit breaker will store state changes but will not
  *     forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST).
  *     When the delay elapses, if the connection has been restored, the circuit closes and forwards the
  *     new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked
@@ -55,6 +54,23 @@ import java.util.concurrent.ScheduledExecutorService;
  *     RetryPolicy indicates that retries are exhausted then the circuit closes - if the current state
  *     is different than the state that caused the circuit to open it is forwarded to the managed listener.
  * </p>
+ *
+ * <p>
+ *     <strong>NOTE:</strong> You should not use this listener directly. Instead, set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
+ *     in the {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)}.
+ * </p>
+ *
+ * <p>
+ *     E.g.
+ * <code><pre>
+ * ConnectionStateListenerManagerFactory factory = ConnectionStateListenerManagerFactory.circuitBreaking(...retry policy for circuit breaking...);
+ * CuratorFramework client = CuratorFrameworkFactory.builder()
+ *     .connectionStateListenerManagerFactory(factory)
+ *     ... etc ...
+ *     .build();
+ * // all connection state listeners set for "client" will get circuit breaking behavior
+ * </pre></code>
+ * </p>
  */
 public class CircuitBreakingConnectionStateListener implements ConnectionStateListener
 {
@@ -77,7 +93,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
      */
     public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy)
     {
-        this(client, listener, retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
+        this(client, listener, CircuitBreaker.build(retryPolicy));
     }
 
     /**
@@ -88,9 +104,14 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
      */
     public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy, ScheduledExecutorService service)
     {
-        this.client = client;
+        this(client, listener, CircuitBreaker.build(retryPolicy, service));
+    }
+
+    CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, CircuitBreaker circuitBreaker)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
         this.listener = Objects.requireNonNull(listener, "listener cannot be null");
-        circuitBreaker = new CircuitBreaker(retryPolicy, service);
+        this.circuitBreaker = Objects.requireNonNull(circuitBreaker, "circuitBreaker cannot be null");
         reset();
     }
 
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java
new file mode 100644
index 0000000..0d9f7eb
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.state;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.listen.UnaryListenerManager;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+class CircuitBreakingManager implements UnaryListenerManager<ConnectionStateListener>
+{
+    private final StandardListenerManager<ConnectionStateListener> mainContainer = StandardListenerManager.standard();
+    private final StandardListenerManager<ConnectionStateListener> doNotProxyContainer = StandardListenerManager.standard();
+    private final CircuitBreakingConnectionStateListener masterListener;
+
+    CircuitBreakingManager(CuratorFramework client, CircuitBreaker circuitBreaker)
+    {
+        ConnectionStateListener masterStateChanged = (__, newState) -> mainContainer.forEach(listener -> listener.stateChanged(client, newState));
+        masterListener = new CircuitBreakingConnectionStateListener(client, masterStateChanged, circuitBreaker);
+    }
+
+    @Override
+    public void clear()
+    {
+        doNotProxyContainer.clear();
+        mainContainer.clear();
+    }
+
+    @Override
+    public int size()
+    {
+        return mainContainer.size() + doNotProxyContainer.size();
+    }
+
+    @Override
+    public void forEach(Consumer<ConnectionStateListener> function)
+    {
+        doNotProxyContainer.forEach(function);
+        function.accept(masterListener);
+    }
+
+    @Override
+    public void addListener(ConnectionStateListener listener)
+    {
+        if ( listener.doNotProxy() )
+        {
+            doNotProxyContainer.addListener(listener);
+        }
+        else
+        {
+            mainContainer.addListener(listener);
+        }
+    }
+
+    @Override
+    public void addListener(ConnectionStateListener listener, Executor executor)
+    {
+        if ( listener.doNotProxy() )
+        {
+            doNotProxyContainer.addListener(listener, executor);
+        }
+        else
+        {
+            mainContainer.addListener(listener, executor);
+        }
+    }
+
+    @Override
+    public void removeListener(ConnectionStateListener listener)
+    {
+        mainContainer.removeListener(listener);
+        doNotProxyContainer.removeListener(listener);
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java
index 71635d0..d626e3f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java
@@ -31,14 +31,14 @@ public interface ConnectionStateListener
     void stateChanged(CuratorFramework client, ConnectionState newState);
 
     /**
-     * Normally, ConnectionStateListeners are decorated via the configured
-     * {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}. For certain
-     * critical cases, however, this is not desired. If your listener returns <code>true</code>
-     * for doNotDecorate(), it will not be passed through the decorator.
+     * ConnectionStateListener managers set via {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerManagerFactory(ConnectionStateListenerManagerFactory)}
+     * are allowed to proxy (etc.) ConnectionStateListeners as needed. If this method returns <code>true</code>
+     * the ConnectionStateListener manager must <em>not</em> proxy the listener as it's a vital internal
+     * listener used by Curator.
      *
      * @return true/false
      */
-    default boolean doNotDecorate()
+    default boolean doNotProxy()
     {
         return false;
     }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
deleted file mode 100644
index b95c4b3..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.state;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * <p>
- *     Allows for the enhancement of the {@link org.apache.curator.framework.state.ConnectionStateListener} instances
- *     used with Curator. Client code that sets a ConnectionStateListener should always wrap it using the configured
- *     ConnectionStateListenerDecorator. All Curator recipes do this.
- * </p>
- *
- * <p>
- *     E.g.
- *
- * <code><pre>
- * CuratorFramework client ...
- * ConnectionStateListener listener = ...
- * ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
- *
- * ...
- *
- * client.getConnectionStateListenable().addListener(decoratedListener);
- *
- * // later, to remove...
- * client.getConnectionStateListenable().removeListener(decoratedListener);
- * </pre></code>
- * </p>
- */
-@FunctionalInterface
-public interface ConnectionStateListenerDecorator
-{
-    ConnectionStateListener decorateListener(CuratorFramework client, ConnectionStateListener actual);
-
-    /**
-     * Pass through - does no decoration
-     */
-    ConnectionStateListenerDecorator standard = (__, actual) -> actual;
-
-    /**
-     * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
-     *
-     * @param retryPolicy the circuit breaking policy to use
-     * @return new decorator
-     */
-    static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy)
-    {
-        return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy);
-    }
-
-    /**
-     * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
-     *
-     * @param retryPolicy the circuit breaking policy to use
-     * @param service the scheduler to use
-     * @return new decorator
-     */
-    static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy, ScheduledExecutorService service)
-    {
-        return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy, service);
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerManagerFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerManagerFactory.java
new file mode 100644
index 0000000..8c6497a
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerManagerFactory.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.state;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.listen.UnaryListenerManager;
+import java.util.concurrent.ScheduledExecutorService;
+
+@FunctionalInterface
+public interface ConnectionStateListenerManagerFactory
+{
+    /**
+     * Create a new listener manager
+     *
+     * @param client curator client
+     * @return manager
+     */
+    UnaryListenerManager<ConnectionStateListener> newManager(CuratorFramework client);
+
+    /**
+     * Pass through
+     */
+    ConnectionStateListenerManagerFactory standard = (__) -> StandardListenerManager.standard();
+
+    /**
+     * Listeners set in this manager receive circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     * as a master listener that proxies to any listener registered by client code (unless the listener returns true
+     * for {@link ConnectionStateListener#doNotProxy()}).
+     *
+     * @param retryPolicy the circuit breaking policy to use
+     * @return new listener manager factory
+     */
+    static ConnectionStateListenerManagerFactory circuitBreaking(RetryPolicy retryPolicy)
+    {
+        return client -> new CircuitBreakingManager(client, CircuitBreaker.build(retryPolicy));
+    }
+
+    /**
+     * Listeners set in this manager receive circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     * as a master listener that proxies to any listener registered by client code (unless the listener returns true
+     * for {@link ConnectionStateListener#doNotProxy()}).
+     *
+     * @param retryPolicy the circuit breaking policy to use
+     * @param service the scheduler to use
+     * @return new listener manager factory
+     */
+    static ConnectionStateListenerManagerFactory circuitBreaking(RetryPolicy retryPolicy, ScheduledExecutorService service)
+    {
+        return client -> new CircuitBreakingManager(client, CircuitBreaker.build(retryPolicy, service));
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 55e17c8..46325d2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -22,7 +22,7 @@ package org.apache.curator.framework.state;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.listen.UnaryListenerManager;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
@@ -71,7 +71,7 @@ public class ConnectionStateManager implements Closeable
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
     private final ExecutorService service;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final StandardListenerManager<ConnectionStateListener> listeners;
+    private final UnaryListenerManager<ConnectionStateListener> listeners;
 
     // guarded by sync
     private ConnectionState currentConnectionState;
@@ -93,7 +93,7 @@ public class ConnectionStateManager implements Closeable
      */
     public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
     {
-        this(client, threadFactory, sessionTimeoutMs, sessionExpirationPercent, ConnectionStateListenerDecorator.standard);
+        this(client, threadFactory, sessionTimeoutMs, sessionExpirationPercent, ConnectionStateListenerManagerFactory.standard);
     }
 
     /**
@@ -101,9 +101,9 @@ public class ConnectionStateManager implements Closeable
      * @param threadFactory thread factory to use or null for a default
      * @param sessionTimeoutMs the ZK session timeout in milliseconds
      * @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all
-     * @param connectionStateListenerDecorator the decorator to use
+     * @param managerFactory manager factory to use
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerManagerFactory managerFactory)
     {
         this.client = client;
         this.sessionTimeoutMs = sessionTimeoutMs;
@@ -113,7 +113,7 @@ public class ConnectionStateManager implements Closeable
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
         }
         service = Executors.newSingleThreadExecutor(threadFactory);
-        listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
+        listeners = managerFactory.newManager(client);
     }
 
     /**
@@ -272,7 +272,7 @@ public class ConnectionStateManager implements Closeable
                 final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                 if ( newState != null )
                 {
-                    if ( listeners.size() == 0 )
+                    if ( listeners.isEmpty() )
                     {
                         log.warn("There are no ConnectionStateListeners registered.");
                     }
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
index f4096cb..bee917e 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
@@ -58,7 +58,7 @@ public class TestCircuitBreaker
         final int retryQty = 1;
         final Duration delay = Duration.ofSeconds(10);
 
-        CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service);
+        CircuitBreaker circuitBreaker = CircuitBreaker.build(new RetryNTimes(retryQty, (int)delay.toMillis()), service);
         AtomicInteger counter = new AtomicInteger(0);
 
         Assert.assertTrue(circuitBreaker.tryToOpen(counter::incrementAndGet));
@@ -79,7 +79,7 @@ public class TestCircuitBreaker
     @Test
     public void testVariousOpenRetryFails()
     {
-        CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryForever(1), service);
+        CircuitBreaker circuitBreaker = CircuitBreaker.build(new RetryForever(1), service);
         Assert.assertFalse(circuitBreaker.tryToRetry(() -> {}));
         Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
         Assert.assertFalse(circuitBreaker.tryToOpen(() -> {}));
@@ -91,7 +91,7 @@ public class TestCircuitBreaker
     public void testWithRetryUntilElapsed()
     {
         RetryPolicy retryPolicy = new RetryUntilElapsed(10000, 10000);
-        CircuitBreaker circuitBreaker = new CircuitBreaker(retryPolicy, service);
+        CircuitBreaker circuitBreaker = CircuitBreaker.build(retryPolicy, service);
         Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
         Assert.assertEquals(lastDelay[0], Duration.ofMillis(10000));
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 7107efa..a0b2187 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -622,7 +622,8 @@ public class LeaderLatch implements Closeable
         client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
     }
 
-    private void handleStateChange(ConnectionState newState)
+    @VisibleForTesting
+    protected void handleStateChange(ConnectionState newState)
     {
         switch ( newState )
         {
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 9717302..3d9e9b7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -28,7 +28,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
+import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
 import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
 import org.apache.curator.retry.RetryForever;
@@ -43,6 +43,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -50,53 +51,114 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class TestLeaderLatch extends BaseClassForTests
 {
     private static final String PATH_NAME = "/one/two/me";
     private static final int MAX_LOOPS = 5;
 
+    private static class Holder
+    {
+        final BlockingQueue<ConnectionState> stateChanges = new LinkedBlockingQueue<>();
+        final CountDownLatch isLockedLatch = new CountDownLatch(1);
+        volatile LeaderLatch latch;
+    }
+
     @Test
     public void testWithCircuitBreaker() throws Exception
     {
+        final int threadQty = 5;
+
+        ExecutorService executorService = Executors.newFixedThreadPool(threadQty);
+        List<Holder> holders = Collections.emptyList();
         Timing2 timing = new Timing2();
-        ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
-        try ( CuratorFramework client = CuratorFrameworkFactory.builder()
+        ConnectionStateListenerManagerFactory managerFactory = ConnectionStateListenerManagerFactory.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
+        CuratorFramework client = CuratorFrameworkFactory.builder()
             .connectString(server.getConnectString())
             .retryPolicy(new RetryOneTime(1))
-            .connectionStateListenerDecorator(decorator)
+            .connectionStateListenerManagerFactory(managerFactory)
             .connectionTimeoutMs(timing.connection())
             .sessionTimeoutMs(timing.session())
-            .build() )
-        {
+            .build();
+        try {
             client.start();
-            AtomicInteger resetCount = new AtomicInteger(0);
-            try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
+            client.create().forPath("/hey");
+
+            Semaphore lostSemaphore = new Semaphore(0);
+            ConnectionStateListener unProxiedListener = new ConnectionStateListener()
             {
                 @Override
-                void reset() throws Exception
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
                 {
-                    resetCount.incrementAndGet();
-                    super.reset();
+                    if ( newState == ConnectionState.LOST )
+                    {
+                        lostSemaphore.release();
+                    }
                 }
-            } )
-            {
-                latch.start();
-                Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
 
-                for ( int i = 0; i < 5; ++i )
+                @Override
+                public boolean doNotProxy()
                 {
-                    server.stop();
-                    server.restart();
-                    timing.sleepABit();
+                    return true;
                 }
-                Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
-                Assert.assertEquals(resetCount.get(), 2);
+            };
+            client.getConnectionStateListenable().addListener(unProxiedListener);
+
+            holders = IntStream.range(0, threadQty)
+                .mapToObj(index -> {
+                    Holder holder = new Holder();
+                    holder.latch = new LeaderLatch(client, "/foo/bar/" + index)
+                    {
+                        @Override
+                        protected void handleStateChange(ConnectionState newState)
+                        {
+                            holder.stateChanges.offer(newState);
+                            super.handleStateChange(newState);
+                        }
+                    };
+                    return holder;
+                })
+                .collect(Collectors.toList());
+
+            holders.forEach(holder -> {
+                executorService.submit(() -> {
+                    holder.latch.start();
+                    Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+                    holder.isLockedLatch.countDown();
+                    return null;
+                });
+                timing.awaitLatch(holder.isLockedLatch);
+            });
+
+            for ( int i = 0; i < 4; ++i )   // note: 4 is just a random number of loops to simulate disconnections
+            {
+                server.stop();
+                Assert.assertTrue(timing.acquireSemaphore(lostSemaphore));
+                server.restart();
+                timing.sleepABit();
             }
+
+            for ( Holder holder : holders )
+            {
+                Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+                Assert.assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.SUSPENDED);
+                Assert.assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.LOST);
+                Assert.assertEquals(timing.takeFromQueue(holder.stateChanges), ConnectionState.RECONNECTED);
+            }
+        }
+        finally
+        {
+            holders.forEach(holder -> CloseableUtils.closeQuietly(holder.latch));
+            CloseableUtils.closeQuietly(client);
+            executorService.shutdownNow();
         }
     }
 
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 720d8d9..2bd7ac1 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -19,13 +19,13 @@ h2. Circuit Breaking ConnectionStateListener
 During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these
 messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreate it in order to try to re\-obtain leadership, etc.
 
-This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open"
+This noisy herding can be avoided by using the circuit breaking listener. When it receives ConnectionState.SUSPENDED, the circuit becomes "open"
 (based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
 goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent.
 
-When the circuit decorator is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the
+When the circuit is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the
 circuit becomes open. The state change that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to get a delay amount.
-While the delay is active, the decorator will store state changes but will not forward them to the managed listener (except, however, the first time the state
+While the delay is active, the circuit breaker will store state changes but will not forward them to the managed listener (except, however, the first time the state
 changes from SUSPENDED to LOST). When the delay elapses, if the connection has been restored, the circuit closes and forwards the new state to the managed listener.
 If the connection has not been restored, the RetryPolicy is checked again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however,
 the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is
@@ -34,12 +34,12 @@ forwarded to the managed listener.
 You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. E.g.
 
 {code}
-ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...);
+ConnectionStateListenerManagerFactory factory = ConnectionStateListenerManagerFactory.circuitBreaking(...retry policy for circuit breaking...);
 CuratorFramework client = CuratorFrameworkFactory.builder()
-    ...
-    .connectionStateListenerDecorator(decorator)
-    ...
-    .build();
+   .connectionStateListenerManagerFactory(factory)
+   ... etc ...
+   .build();
+// all connection state listeners set for "client" will get circuit breaking behavior
 {code}
 
 h2. Locker