You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/02/12 22:10:17 UTC

[curator] branch CURATOR-558-pt1-remove-zk40-etc updated (e943f7a -> 45a6665)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-558-pt1-remove-zk40-etc
in repository https://gitbox.apache.org/repos/asf/curator.git.


    omit e943f7a  CURATOR-558 - uses CuratorTestBase so that retries occur and a short session timeout for faster execution
    omit b74bbb9  CURATOR-558
     add 75226fb  CURATOR-551 (#345)
     new 5a04ba4  CURATOR-558
     new 45a6665  CURATOR-558 - uses CuratorTestBase so that retries occur and a short session timeout for faster execution

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e943f7a)
            \
             N -- N -- N   refs/heads/CURATOR-558-pt1-remove-zk40-etc (45a6665)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/curator/ConnectionState.java   | 25 ++++----
 .../main/java/org/apache/curator/HandleHolder.java | 70 ++++++----------------
 .../src/main/java/org/apache/curator/Helper.java   | 36 +++++++----
 3 files changed, 58 insertions(+), 73 deletions(-)
 copy curator-framework/src/main/java/org/apache/curator/framework/imps/PathAndBytes.java => curator-client/src/main/java/org/apache/curator/Helper.java (56%)


[curator] 01/02: CURATOR-558

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-558-pt1-remove-zk40-etc
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 5a04ba4c0861a60192e1cd3e1d10875c4b03ea72
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Feb 2 11:34:34 2020 -0500

    CURATOR-558
    
    Pt1 of change
    
    * Remove the ZK 3.4 compatibility module and code
    * Remove the deprecated ListenerContainer that leaks Guava classes into our APIs
    * Remove Exhibitor support
    * Various minor changes/cleanups
---
 .../exhibitor/DefaultExhibitorRestClient.java      |  68 ---
 .../exhibitor/ExhibitorEnsembleProvider.java       | 335 -----------
 .../ensemble/exhibitor/ExhibitorRestClient.java    |  34 --
 .../curator/ensemble/exhibitor/Exhibitors.java     |  66 ---
 .../org/apache/curator/utils/Compatibility.java    | 101 ----
 .../curator/utils/InjectSessionExpiration.java     |  77 ---
 .../test/java/org/apache/curator/BasicTests.java   |   4 +-
 .../apache/curator/TestSessionFailRetryLoop.java   |  10 +-
 .../exhibitor/TestExhibitorEnsembleProvider.java   | 227 --------
 .../apache/curator/framework/CuratorFramework.java |   7 -
 .../curator/framework/CuratorFrameworkFactory.java |  22 -
 .../apache/curator/framework/SafeIsTtlMode.java    |  44 --
 .../framework/imps/CompatibleCreateCallback.java   |  26 -
 .../curator/framework/imps/CreateBuilderImpl.java  |  78 +--
 .../apache/curator/framework/imps/CreateZK35.java  |  47 --
 .../framework/imps/CuratorFrameworkImpl.java       |  83 +--
 .../imps/CuratorMultiTransactionImpl.java          |  18 +-
 .../framework/imps/WatcherRemovalManager.java      |   7 +-
 .../framework/listen/ListenerContainer.java        | 112 ----
 .../framework/listen/MappingListenerManager.java   |   2 +-
 .../framework/state/ConnectionStateManager.java    |   3 +-
 .../curator/framework/imps/TestCleanState.java     |   7 -
 .../framework/imps/TestCreateReturningStat.java    |   2 -
 .../imps/TestEnabledSessionExpiredState.java       |   3 +-
 .../curator/framework/imps/TestFramework.java      |   2 -
 .../curator/framework/imps/TestFrameworkEdges.java |  13 +-
 .../framework/imps/TestReconfiguration.java        |   2 -
 .../curator/framework/imps/TestRemoveWatches.java  |   2 -
 .../curator/framework/imps/TestTtlNodes.java       |   2 -
 .../framework/imps/TestWatcherRemovalManager.java  |   2 -
 .../curator/framework/recipes/cache/NodeCache.java |  35 +-
 .../framework/recipes/cache/PathChildrenCache.java |  39 +-
 .../curator/framework/recipes/cache/TreeCache.java |  49 +-
 .../framework/recipes/leader/LeaderLatch.java      |  25 +-
 .../framework/recipes/locks/ChildReaper.java       | 307 ----------
 .../curator/framework/recipes/locks/Reaper.java    | 380 ------------
 .../framework/recipes/nodes/PersistentNode.java    |  41 +-
 .../recipes/queue/DistributedDelayQueue.java       |   4 +-
 .../recipes/queue/DistributedIdQueue.java          |   4 +-
 .../recipes/queue/DistributedPriorityQueue.java    |   4 +-
 .../framework/recipes/queue/DistributedQueue.java  |  63 +-
 .../curator/framework/recipes/queue/QueueBase.java |   4 +-
 .../framework/recipes/shared/SharedValue.java      |  52 +-
 .../recipes/shared/SharedValueReader.java          |   4 +-
 .../framework/recipes/cache/TestNodeCache.java     |   4 +-
 .../recipes/cache/TestPathChildrenCache.java       |   4 +-
 .../framework/recipes/cache/TestTreeCache.java     |   6 +-
 .../recipes/leader/TestLeaderSelector.java         |   4 +-
 .../framework/recipes/locks/TestChildReaper.java   | 351 -----------
 .../recipes/locks/TestInterProcessMutex.java       |   4 +-
 .../recipes/locks/TestInterProcessMutexBase.java   |   4 +-
 .../recipes/locks/TestInterProcessSemaphore.java   |  47 --
 .../framework/recipes/locks/TestReaper.java        | 647 ---------------------
 .../recipes/nodes/TestPersistentEphemeralNode.java |  10 +-
 .../recipes/nodes/TestPersistentTtlNode.java       |   2 -
 .../test/compatibility/CuratorTestBase.java        |   1 -
 .../test/compatibility/Zk35MethodInterceptor.java  |  56 --
 .../x/async/modeled/details/ModeledCacheImpl.java  |  21 +-
 .../x/discovery/details/ServiceCacheImpl.java      |  79 +--
 .../x/discovery/details/TestServiceDiscovery.java  |   8 +-
 pom.xml                                            |   6 -
 src/site/confluence/exhibitor.confluence           |  24 -
 ...y.confluence => zk-compatibility-34.confluence} |  20 +-
 src/site/site.xml                                  |   7 +-
 64 files changed, 237 insertions(+), 3485 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java
deleted file mode 100644
index 01b9525..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import org.apache.curator.utils.CloseableUtils;
-import java.io.BufferedInputStream;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URI;
-
-@SuppressWarnings("UnusedDeclaration")
-public class DefaultExhibitorRestClient implements ExhibitorRestClient
-{
-    private final boolean useSsl;
-
-    public DefaultExhibitorRestClient()
-    {
-        this(false);
-    }
-
-    public DefaultExhibitorRestClient(boolean useSsl)
-    {
-        this.useSsl = useSsl;
-    }
-
-    @Override
-    public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-    {
-        URI                 uri = new URI(useSsl ? "https" : "http", null, hostname, port, uriPath, null, null);
-        HttpURLConnection   connection = (HttpURLConnection)uri.toURL().openConnection();
-        connection.addRequestProperty("Accept", mimeType);
-        StringBuilder       str = new StringBuilder();
-        InputStream         in = new BufferedInputStream(connection.getInputStream());
-        try
-        {
-            for(;;)
-            {
-                int     b = in.read();
-                if ( b < 0 )
-                {
-                    break;
-                }
-                str.append((char)(b & 0xff));
-            }
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(in);
-        }
-        return str.toString();
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
deleted file mode 100644
index eb4f8bd..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.ensemble.EnsembleProvider;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Ensemble provider that polls a cluster of Exhibitor (https://github.com/Netflix/exhibitor)
- * instances for the connection string.
- * If the set of instances should change, new ZooKeeper connections will use the new connection
- * string.
- */
-public class ExhibitorEnsembleProvider implements EnsembleProvider
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final AtomicReference<Exhibitors> exhibitors = new AtomicReference<Exhibitors>();
-    private final AtomicReference<Exhibitors> masterExhibitors = new AtomicReference<Exhibitors>();
-    private final ExhibitorRestClient restClient;
-    private final String restUriPath;
-    private final int pollingMs;
-    private final RetryPolicy retryPolicy;
-    private final ScheduledExecutorService service = ThreadUtils.newSingleThreadScheduledExecutor("ExhibitorEnsembleProvider");
-    private final Random random = new Random();
-    private final AtomicReference<String> connectionString = new AtomicReference<String>("");
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-
-    private static final String MIME_TYPE = "application/x-www-form-urlencoded";
-
-    private static final String VALUE_PORT = "port";
-    private static final String VALUE_COUNT = "count";
-    private static final String VALUE_SERVER_PREFIX = "server";
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    /**
-     * @param exhibitors the current set of exhibitor instances (can be changed later via {@link #setExhibitors(Exhibitors)})
-     * @param restClient the rest client to use (use {@link DefaultExhibitorRestClient} for most cases)
-     * @param restUriPath the path of the REST call used to get the server set. Usually: <code>/exhibitor/v1/cluster/list</code>
-     * @param pollingMs how ofter to poll the exhibitors for the list
-     * @param retryPolicy retry policy to use when connecting to the exhibitors
-     */
-    public ExhibitorEnsembleProvider(Exhibitors exhibitors, ExhibitorRestClient restClient, String restUriPath, int pollingMs, RetryPolicy retryPolicy)
-    {
-        this.exhibitors.set(exhibitors);
-        this.masterExhibitors.set(exhibitors);
-        this.restClient = restClient;
-        this.restUriPath = restUriPath;
-        this.pollingMs = pollingMs;
-        this.retryPolicy = retryPolicy;
-    }
-
-    /**
-     * Change the set of exhibitors to poll
-     *
-     * @param newExhibitors new set
-     */
-    public void     setExhibitors(Exhibitors newExhibitors)
-    {
-        exhibitors.set(newExhibitors);
-        masterExhibitors.set(newExhibitors);
-    }
-
-    /**
-     * Can be called prior to starting the Curator instance to set the current connection string
-     *
-     * @throws Exception errors
-     */
-    public void     pollForInitialEnsemble() throws Exception
-    {
-        Preconditions.checkState(state.get() == State.LATENT, "Cannot be called after start()");
-        poll();
-    }
-
-    @Override
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
-        service.scheduleWithFixedDelay
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        poll();
-                    }
-                },
-                pollingMs,
-                pollingMs,
-                TimeUnit.MILLISECONDS
-            );
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
-
-        service.shutdownNow();
-    }
-
-    @Override
-    public String getConnectionString()
-    {
-        return connectionString.get();
-    }
-
-    @Override
-    public void setConnectionString(String connectionString)
-    {
-        log.info("setConnectionString received. Ignoring. " + connectionString);
-    }
-
-    @Override
-    public boolean updateServerListEnabled()
-    {
-        return false;
-    }
-
-    @VisibleForTesting
-    protected void poll()
-    {
-        Exhibitors              localExhibitors = exhibitors.get();
-        Map<String, String>     values = queryExhibitors(localExhibitors);
-
-        int                     count = getCountFromValues(values);
-        if ( count == 0 )
-        {
-            log.warn("0 count returned from Exhibitors. Using backup connection values.");
-            values = useBackup(localExhibitors);
-            count = getCountFromValues(values);
-        }
-
-        if ( count > 0 )
-        {
-            int                     port = Integer.parseInt(values.get(VALUE_PORT));
-            StringBuilder           newConnectionString = new StringBuilder();
-            List<String>            newHostnames = Lists.newArrayList();
-
-            for ( int i = 0; i < count; ++i )
-            {
-                if ( newConnectionString.length() > 0 )
-                {
-                    newConnectionString.append(",");
-                }
-                String      server = values.get(VALUE_SERVER_PREFIX + i);
-                newConnectionString.append(server).append(":").append(port);
-                newHostnames.add(server);
-            }
-
-            String newConnectionStringValue = newConnectionString.toString();
-            if ( !newConnectionStringValue.equals(connectionString.get()) )
-            {
-                log.info(String.format("Connection string has changed. Old value (%s), new value (%s)", connectionString.get(), newConnectionStringValue));
-            }
-            Exhibitors newExhibitors = new Exhibitors
-            (
-                newHostnames,
-                localExhibitors.getRestPort(),
-                new Exhibitors.BackupConnectionStringProvider()
-                {
-                    @Override
-                    public String getBackupConnectionString() throws Exception
-                    {
-                        return masterExhibitors.get().getBackupConnectionString();  // this may be overloaded by clients. Make sure there is always a method call to get the value.
-                    }
-                }
-            );
-            connectionString.set(newConnectionStringValue);
-            exhibitors.set(newExhibitors);
-        }
-    }
-
-    private int getCountFromValues(Map<String, String> values)
-    {
-        try
-        {
-            return Integer.parseInt(values.get(VALUE_COUNT));
-        }
-        catch ( NumberFormatException e )
-        {
-            // ignore
-        }
-        return 0;
-    }
-
-    private Map<String, String> useBackup(Exhibitors localExhibitors)
-    {
-        Map<String, String>     values = newValues();
-
-        try
-        {
-            String      backupConnectionString = localExhibitors.getBackupConnectionString();
-
-            int         thePort = -1;
-            int         count = 0;
-            for ( String spec : backupConnectionString.split(",") )
-            {
-                spec = spec.trim();
-                String[]        parts = spec.split(":");
-                if ( parts.length == 2 )
-                {
-                    String      hostname = parts[0];
-                    int         port = Integer.parseInt(parts[1]);
-                    if ( thePort < 0 )
-                    {
-                        thePort = port;
-                    }
-                    else if ( port != thePort )
-                    {
-                        log.warn("Inconsistent port in connection component: " + spec);
-                    }
-                    values.put(VALUE_SERVER_PREFIX + count, hostname);
-                    ++count;
-                }
-                else
-                {
-                    log.warn("Bad backup connection component: " + spec);
-                }
-            }
-            values.put(VALUE_COUNT, Integer.toString(count));
-            values.put(VALUE_PORT, Integer.toString(thePort));
-        }
-        catch ( Exception e )
-        {
-            ThreadUtils.checkInterrupted(e);
-            log.error("Couldn't get backup connection string", e);
-        }
-        return values;
-    }
-
-    private Map<String, String> newValues()
-    {
-        Map<String, String>     values = Maps.newHashMap();
-        values.put(VALUE_COUNT, "0");
-        return values;
-    }
-
-    private static Map<String, String> decodeExhibitorList(String str) throws UnsupportedEncodingException
-    {
-        Map<String, String>     values = Maps.newHashMap();
-        for ( String spec : str.split("&") )
-        {
-            String[]        parts = spec.split("=");
-            if ( parts.length == 2 )
-            {
-                values.put(parts[0], URLDecoder.decode(parts[1], "UTF-8"));
-            }
-        }
-
-        return values;
-    }
-
-    private Map<String, String> queryExhibitors(Exhibitors localExhibitors)
-    {
-        Map<String, String> values = newValues();
-
-        long                    start = System.currentTimeMillis();
-        int                     retries = 0;
-        boolean                 done = false;
-        while ( !done )
-        {
-            List<String> hostnames = Lists.newArrayList(localExhibitors.getHostnames());
-            if ( hostnames.size() == 0 )
-            {
-                done = true;
-            }
-            else
-            {
-                String          hostname = hostnames.get(random.nextInt(hostnames.size()));
-                try
-                {
-                    String                  encoded = restClient.getRaw(hostname, localExhibitors.getRestPort(), restUriPath, MIME_TYPE);
-                    values.putAll(decodeExhibitorList(encoded));
-                    done = true;
-                }
-                catch ( Throwable e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    if ( retryPolicy.allowRetry(retries++, System.currentTimeMillis() - start, RetryLoop.getDefaultRetrySleeper()) )
-                    {
-                        log.warn("Couldn't get servers from Exhibitor. Retrying.", e);
-                    }
-                    else
-                    {
-                        log.error("Couldn't get servers from Exhibitor. Giving up.", e);
-                        done = true;
-                    }
-                }
-            }
-        }
-
-        return values;
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java
deleted file mode 100644
index 007f652..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-public interface ExhibitorRestClient
-{
-    /**
-     * Connect to the given Exhibitor and return the raw result
-     *
-     * @param hostname host to connect to
-     * @param port connect port
-     * @param uriPath path
-     * @param mimeType Accept mime type
-     * @return raw result
-     * @throws Exception errors
-     */
-    public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception;
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java
deleted file mode 100644
index 24e0c14..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-
-/**
- * POJO for specifying the cluster of Exhibitor instances
- */
-public class Exhibitors
-{
-    private final Collection<String> hostnames;
-    private final int restPort;
-    private final BackupConnectionStringProvider backupConnectionStringProvider;
-
-    public interface BackupConnectionStringProvider
-    {
-        public String getBackupConnectionString() throws Exception;
-    }
-
-    /**
-     * @param hostnames set of Exhibitor instance host names
-     * @param restPort the REST port used to connect to Exhibitor
-     * @param backupConnectionStringProvider in case an Exhibitor instance can't be contacted, returns the fixed
-     *                               connection string to use as a backup
-     */
-    public Exhibitors(Collection<String> hostnames, int restPort, BackupConnectionStringProvider backupConnectionStringProvider)
-    {
-        this.backupConnectionStringProvider = Preconditions.checkNotNull(backupConnectionStringProvider, "backupConnectionStringProvider cannot be null");
-        this.hostnames = ImmutableList.copyOf(hostnames);
-        this.restPort = restPort;
-    }
-
-    public Collection<String> getHostnames()
-    {
-        return hostnames;
-    }
-
-    public int getRestPort()
-    {
-        return restPort;
-    }
-
-    public String getBackupConnectionString() throws Exception
-    {
-        return backupConnectionStringProvider.getBackupConnectionString();
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
deleted file mode 100644
index 1ee2301..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.utils;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.reflect.Method;
-
-/**
- * Utils to help with ZK 3.4.x compatibility
- */
-public class Compatibility
-{
-    private static final boolean hasZooKeeperAdmin;
-    private static final Method queueEventMethod;
-    private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
-
-    static
-    {
-        boolean localHasZooKeeperAdmin;
-        try
-        {
-            Class.forName("org.apache.zookeeper.admin.ZooKeeperAdmin");
-            localHasZooKeeperAdmin = true;
-        }
-        catch ( ClassNotFoundException e )
-        {
-            localHasZooKeeperAdmin = false;
-            logger.info("Running in ZooKeeper 3.4.x compatibility mode");
-        }
-        hasZooKeeperAdmin = localHasZooKeeperAdmin;
-
-        Method localQueueEventMethod;
-        try
-        {
-            Class<?> testableClass = Class.forName("org.apache.zookeeper.Testable");
-            localQueueEventMethod = testableClass.getMethod("queueEvent", WatchedEvent.class);
-        }
-        catch ( ReflectiveOperationException ignore )
-        {
-            localQueueEventMethod = null;
-            LoggerFactory.getLogger(Compatibility.class).info("Using emulated InjectSessionExpiration");
-        }
-        queueEventMethod = localQueueEventMethod;
-    }
-
-    /**
-     * Return true if the classpath ZooKeeper library is 3.4.x
-     *
-     * @return true/false
-     */
-    public static boolean isZK34()
-    {
-        return !hasZooKeeperAdmin;
-    }
-
-    /**
-     * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>.
-     * For ZooKeeper 3.4.x do the equivalent via reflection
-     *
-     * @param zooKeeper client
-     */
-    public static void injectSessionExpiration(ZooKeeper zooKeeper)
-    {
-        if ( isZK34() || (queueEventMethod == null) )
-        {
-            InjectSessionExpiration.injectSessionExpiration(zooKeeper);
-        }
-        else
-        {
-            try
-            {
-                WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
-                queueEventMethod.invoke(zooKeeper.getTestable(), event);
-            }
-            catch ( Exception e )
-            {
-                logger.error("Could not call Testable.queueEvent()", e);
-            }
-        }
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java b/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
deleted file mode 100644
index 8ad2b5d..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.utils;
-
-import org.apache.zookeeper.ClientCnxn;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-
-// reflective version of zooKeeper.getTestable().injectSessionExpiration();
-@SuppressWarnings("JavaReflectionMemberAccess")
-public class InjectSessionExpiration
-{
-    private static final Field cnxnField;
-    private static final Field eventThreadField;
-    private static final Method queueEventMethod;
-    static
-    {
-        Field localCnxnField;
-        Field localEventThreadField;
-        Method localQueueEventMethod;
-        try
-        {
-            Class<?> eventThreadClass = Class.forName("org.apache.zookeeper.ClientCnxn$EventThread");
-
-            localCnxnField = ZooKeeper.class.getDeclaredField("cnxn");
-            localCnxnField.setAccessible(true);
-            localEventThreadField = ClientCnxn.class.getDeclaredField("eventThread");
-            localEventThreadField.setAccessible(true);
-            localQueueEventMethod = eventThreadClass.getDeclaredMethod("queueEvent", WatchedEvent.class);
-            localQueueEventMethod.setAccessible(true);
-        }
-        catch ( ReflectiveOperationException e )
-        {
-            throw new RuntimeException("Could not access internal ZooKeeper fields", e);
-        }
-        cnxnField = localCnxnField;
-        eventThreadField = localEventThreadField;
-        queueEventMethod = localQueueEventMethod;
-    }
-
-    public static void injectSessionExpiration(ZooKeeper zooKeeper)
-    {
-        try
-        {
-            WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
-
-            ClientCnxn clientCnxn = (ClientCnxn)cnxnField.get(zooKeeper);
-            Object eventThread = eventThreadField.get(clientCnxn);
-            queueEventMethod.invoke(eventThread, event);
-
-            // we used to set the state field to CLOSED here and a few other things but this resulted in CURATOR-498
-        }
-        catch ( ReflectiveOperationException e )
-        {
-            throw new RuntimeException("Could not inject session expiration using reflection", e);
-        }
-    }
-}
diff --git a/curator-client/src/test/java/org/apache/curator/BasicTests.java b/curator-client/src/test/java/org/apache/curator/BasicTests.java
index f951fb5..96b1ad0 100644
--- a/curator-client/src/test/java/org/apache/curator/BasicTests.java
+++ b/curator-client/src/test/java/org/apache/curator/BasicTests.java
@@ -21,8 +21,8 @@ package org.apache.curator;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -100,7 +100,7 @@ public class BasicTests extends BaseClassForTests
                                 // ignore
                             }
 
-                            Compatibility.injectSessionExpiration(client.getZooKeeper());
+                            client.getZooKeeper().getTestable().injectSessionExpiration();
 
                             Assert.assertTrue(timing.awaitLatch(latch));
                         }
diff --git a/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java b/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
index 7c9c963..e661930 100644
--- a/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
+++ b/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
@@ -20,9 +20,9 @@ package org.apache.curator;
 
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.Callable;
@@ -57,7 +57,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                 if ( firstTime.compareAndSet(true, false) )
                                 {
                                     Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                    Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                    client.getZooKeeper().getTestable().injectSessionExpiration();
                                     client.getZooKeeper();
                                     client.blockUntilConnectedOrTimedOut();
                                 }
@@ -131,7 +131,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                     if ( firstTime.compareAndSet(true, false) )
                                     {
                                         Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                        Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                        client.getZooKeeper().getTestable().injectSessionExpiration();
                                         client.getZooKeeper();
                                         client.blockUntilConnectedOrTimedOut();
                                     }
@@ -196,7 +196,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                 public Void call() throws Exception
                                 {
                                     Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                    Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                    client.getZooKeeper().getTestable().injectSessionExpiration();
 
                                     timing.sleepABit();
 
@@ -258,7 +258,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                     public Void call() throws Exception
                                     {
                                         Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                        Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                        client.getZooKeeper().getTestable().injectSessionExpiration();
 
                                         client.getZooKeeper();
                                         client.blockUntilConnectedOrTimedOut();
diff --git a/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java b/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java
deleted file mode 100644
index 60c817a..0000000
--- a/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class TestExhibitorEnsembleProvider extends BaseClassForTests
-{
-    private static final Exhibitors.BackupConnectionStringProvider dummyConnectionStringProvider = new Exhibitors.BackupConnectionStringProvider()
-    {
-        @Override
-        public String getBackupConnectionString() throws Exception
-        {
-            return null;
-        }
-    };
-
-    @Test
-    public void     testExhibitorFailures() throws Exception
-    {
-        final AtomicReference<String>   backupConnectionString = new AtomicReference<String>("backup1:1");
-        final AtomicReference<String>   connectionString = new AtomicReference<String>("count=1&port=2&server0=localhost");
-        Exhibitors                      exhibitors = new Exhibitors
-        (
-            Lists.newArrayList("foo", "bar"),
-            1000,
-            new Exhibitors.BackupConnectionStringProvider()
-            {
-                @Override
-                public String getBackupConnectionString()
-                {
-                    return backupConnectionString.get();
-                }
-            }
-        );
-        ExhibitorRestClient             mockRestClient = new ExhibitorRestClient()
-        {
-            @Override
-            public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-            {
-                String localConnectionString = connectionString.get();
-                if ( localConnectionString == null )
-                {
-                    throw new IOException();
-                }
-                return localConnectionString;
-            }
-        };
-
-        final Semaphore             semaphore = new Semaphore(0);
-        ExhibitorEnsembleProvider   provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1))
-        {
-            @Override
-            protected void poll()
-            {
-                super.poll();
-                semaphore.release();
-            }
-        };
-        provider.pollForInitialEnsemble();
-        try
-        {
-            provider.start();
-
-            Assert.assertEquals(provider.getConnectionString(), "localhost:2");
-
-            connectionString.set(null);
-            semaphore.drainPermits();
-            semaphore.acquire();    // wait for next poll
-            Assert.assertEquals(provider.getConnectionString(), "backup1:1");
-
-            backupConnectionString.set("backup2:2");
-            semaphore.drainPermits();
-            semaphore.acquire();    // wait for next poll
-            Assert.assertEquals(provider.getConnectionString(), "backup2:2");
-
-            connectionString.set("count=1&port=3&server0=localhost3");
-            semaphore.drainPermits();
-            semaphore.acquire();    // wait for next poll
-            Assert.assertEquals(provider.getConnectionString(), "localhost3:3");
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(provider);
-        }
-    }
-
-    @Test
-    public void     testChanging() throws Exception
-    {
-        TestingServer               secondServer = new TestingServer();
-        try
-        {
-            String                          mainConnectionString = "count=1&port=" + server.getPort() + "&server0=localhost";
-            String                          secondConnectionString = "count=1&port=" + secondServer.getPort() + "&server0=localhost";
-
-            final Semaphore                 semaphore = new Semaphore(0);
-            final AtomicReference<String>   connectionString = new AtomicReference<String>(mainConnectionString);
-            Exhibitors                      exhibitors = new Exhibitors(Lists.newArrayList("foo", "bar"), 1000, dummyConnectionStringProvider);
-            ExhibitorRestClient             mockRestClient = new ExhibitorRestClient()
-            {
-                @Override
-                public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-                {
-                    semaphore.release();
-                    return connectionString.get();
-                }
-            };
-            ExhibitorEnsembleProvider   provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1));
-            provider.pollForInitialEnsemble();
-
-            Timing                          timing = new Timing().multiple(4);
-            final CuratorZookeeperClient    client = new CuratorZookeeperClient(provider, timing.session(), timing.connection(), null, new RetryOneTime(2));
-            client.start();
-            try
-            {
-                RetryLoop.callWithRetry
-                (
-                    client,
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
-                        {
-                            client.getZooKeeper().create("/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                            return null;
-                        }
-                    }
-                );
-
-                connectionString.set(secondConnectionString);
-                semaphore.drainPermits();
-                semaphore.acquire();
-
-                server.stop();  // create situation where the current zookeeper gets a sys-disconnected
-
-                Stat        stat = RetryLoop.callWithRetry
-                (
-                    client,
-                    new Callable<Stat>()
-                    {
-                        @Override
-                        public Stat call() throws Exception
-                        {
-                            return client.getZooKeeper().exists("/test", false);
-                        }
-                    }
-                );
-                Assert.assertNull(stat);    // it's a different server so should be null
-            }
-            finally
-            {
-                client.close();
-            }
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(secondServer);
-        }
-    }
-
-    @Test
-    public void     testSimple() throws Exception
-    {
-        Exhibitors                  exhibitors = new Exhibitors(Lists.newArrayList("foo", "bar"), 1000, dummyConnectionStringProvider);
-        ExhibitorRestClient         mockRestClient = new ExhibitorRestClient()
-        {
-            @Override
-            public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-            {
-                return "count=1&port=" + server.getPort() + "&server0=localhost";
-            }
-        };
-        ExhibitorEnsembleProvider   provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1));
-        provider.pollForInitialEnsemble();
-
-        Timing                      timing = new Timing();
-        CuratorZookeeperClient      client = new CuratorZookeeperClient(provider, timing.session(), timing.connection(), null, new ExponentialBackoffRetry(timing.milliseconds(), 3));
-        client.start();
-        try
-        {
-            client.blockUntilConnectedOrTimedOut();
-            client.getZooKeeper().exists("/", false);
-        }
-        catch ( Exception e )
-        {
-            Assert.fail("provider.getConnectionString(): " + provider.getConnectionString() + " server.getPort(): " + server.getPort(), e);
-        }
-        finally
-        {
-            client.close();
-        }
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..8b39ebd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -325,13 +325,6 @@ public interface CuratorFramework extends Closeable
     SchemaSet getSchemaSet();
 
     /**
-     * Return true if this instance is running in ZK 3.4.x compatibility mode
-     *
-     * @return true/false
-     */
-    boolean isZk34CompatibilityMode();
-
-    /**
      * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
      * done from the {@link #runSafe(Runnable)} thread.
      *
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 887a2aa..9c075bb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -54,8 +54,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.curator.CuratorZookeeperClient;
 
-import static org.apache.curator.utils.Compatibility.isZK34;
-
 /**
  * Factory methods for creating framework-style clients
  */
@@ -150,7 +148,6 @@ public class CuratorFrameworkFactory
         private ConnectionStateErrorPolicy connectionStateErrorPolicy = new StandardConnectionStateErrorPolicy();
         private ConnectionHandlingPolicy connectionHandlingPolicy = new StandardConnectionHandlingPolicy();
         private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
-        private boolean zk34CompatibilityMode = isZK34();
         private int waitForShutdownTimeoutMs = 0;
         private Executor runSafeService = null;
         private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;
@@ -395,20 +392,6 @@ public class CuratorFrameworkFactory
         }
 
         /**
-         * If mode is true, create a ZooKeeper 3.4.x compatible client. IMPORTANT: If the client
-         * library used is ZooKeeper 3.4.x <code>zk34CompatibilityMode</code> is enabled by default.
-         *
-         * @since 3.5.0
-         * @param mode true/false
-         * @return this
-         */
-        public Builder zk34CompatibilityMode(boolean mode)
-        {
-            this.zk34CompatibilityMode = mode;
-            return this;
-        }
-
-        /**
          * Set a timeout for {@link CuratorZookeeperClient#close(int)}  }.
          * The default is 0, which means that this feature is disabled.
          *
@@ -591,11 +574,6 @@ public class CuratorFrameworkFactory
             return schemaSet;
         }
 
-        public boolean isZk34CompatibilityMode()
-        {
-            return zk34CompatibilityMode;
-        }
-
         @Deprecated
         public String getAuthScheme()
         {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java b/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java
deleted file mode 100644
index e499a7b..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework;
-
-import org.apache.curator.utils.Compatibility;
-import org.apache.zookeeper.CreateMode;
-
-public class SafeIsTtlMode
-{
-    private static class Internal
-    {
-        private static final Internal instance = new Internal();
-
-        public boolean isTtl(CreateMode mode)
-        {
-            return mode.isTTL();
-        }
-    }
-
-    public static boolean isTtl(CreateMode mode)
-    {
-        return !Compatibility.isZK34() && Internal.instance.isTtl(mode);
-    }
-
-    private SafeIsTtlMode()
-    {
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java
deleted file mode 100644
index 30ca391..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.zookeeper.data.Stat;
-
-interface CompatibleCreateCallback
-{
-    void processResult(int rc, String path, Object ctx, String name, Stat stat);
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index a02a6be..ee5c541 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -124,7 +124,6 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
     @Override
     public CreateBuilderMain withTtl(long ttl)
     {
-        Preconditions.checkState(!client.isZk34CompatibilityMode(), "TTLs are not support when running in ZooKeeper 3.4 compatibility mode");
         this.ttl = ttl;
         return this;
     }
@@ -182,14 +181,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                 }
 
                 String fixedPath = client.fixForNamespace(path);
-                if ( client.isZk34CompatibilityMode() )
-                {
-                    transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
-                }
-                else
-                {
-                    transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
-                }
+                transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
                 return context;
             }
         };
@@ -636,10 +628,11 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
             final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Background");
             final byte[] data = operationAndData.getData().getData();
 
-            final CompatibleCreateCallback mainCallback = new CompatibleCreateCallback()
+            AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback()
             {
                 @Override
-                public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
+                public void processResult(int rc, String path, Object ctx, String name, Stat stat)
+                {
                     trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();
 
                     if ( (stat != null) && (storingStat != null) )
@@ -661,41 +654,16 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                     }
                 }
             };
-
-            if ( client.isZk34CompatibilityMode() )
-            {
-                AsyncCallback.StringCallback stringCallback = new AsyncCallback.StringCallback()
-                {
-                    @Override
-                    public void processResult(int rc, String path, Object ctx, String name)
-                    {
-                        mainCallback.processResult(rc, path, ctx, name, null);
-                    }
-                };
-                client.getZooKeeper().create
-                    (
-                        operationAndData.getData().getPath(),
-                        data,
-                        acling.getAclList(operationAndData.getData().getPath()),
-                        createMode,
-                        stringCallback,
-                        backgrounding.getContext()
-                    );
-            }
-            else
-            {
-                CreateZK35.create
-                    (
-                        client.getZooKeeper(),
-                        operationAndData.getData().getPath(),
-                        data,
-                        acling.getAclList(operationAndData.getData().getPath()),
-                        createMode,
-                        mainCallback,
-                        backgrounding.getContext(),
-                        ttl
-                    );
-            }
+            client.getZooKeeper().create
+                (
+                    operationAndData.getData().getPath(),
+                    data,
+                    acling.getAclList(operationAndData.getData().getPath()),
+                    createMode,
+                    callback,
+                    backgrounding.getContext(),
+                    ttl
+                );
         }
         catch ( Throwable e )
         {
@@ -1171,28 +1139,14 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                         {
                             try
                             {
-                                if ( client.isZk34CompatibilityMode() )
-                                {
-                                    createdPath = client.getZooKeeper().create(path, data, aclList, createMode);
-                                }
-                                else
-                                {
-                                    createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
-                                }
+                                createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
                             }
                             catch ( KeeperException.NoNodeException e )
                             {
                                 if ( createParentsIfNeeded )
                                 {
                                     ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
-                                    if ( client.isZk34CompatibilityMode() )
-                                    {
-                                        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
-                                    }
-                                    else
-                                    {
-                                        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
-                                    }
+                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
                                 }
                                 else
                                 {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java
deleted file mode 100644
index a32e614..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import java.util.List;
-
-// keep reference to AsyncCallback.Create2Callback in separate class for ZK 3.4 compatibility
-class CreateZK35
-{
-    static void create(ZooKeeper zooKeeper, String path, byte data[], List<ACL> acl, CreateMode createMode, final CompatibleCreateCallback compatibleCallback, Object ctx, long ttl)
-    {
-        AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback()
-        {
-            @Override
-            public void processResult(int rc, String path, Object ctx, String name, Stat stat)
-            {
-                compatibleCallback.processResult(rc, path, ctx, name, stat);
-            }
-        };
-        zooKeeper.create(path, data, acl, createMode, callback, ctx, ttl);
-    }
-
-    private CreateZK35()
-    {
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..90f740d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.curator.CuratorConnectionLossException;
@@ -36,7 +35,7 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
 import org.apache.curator.framework.api.transaction.CuratorTransaction;
 import org.apache.curator.framework.api.transaction.TransactionOp;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
@@ -67,8 +66,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorZookeeperClient client;
-    private final ListenerContainer<CuratorListener> listeners;
-    private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
+    private final StandardListenerManager<CuratorListener> listeners;
+    private final StandardListenerManager<UnhandledErrorListener> unhandledErrorListeners;
     private final ThreadFactory threadFactory;
     private final int maxCloseWaitMs;
     private final BlockingQueue<OperationAndData<?>> backgroundOperations;
@@ -88,7 +87,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final InternalConnectionHandler internalConnectionHandler;
     private final EnsembleTracker ensembleTracker;
     private final SchemaSet schemaSet;
-    private final boolean zk34CompatibilityMode;
     private final Executor runSafeService;
 
     private volatile ExecutorService executorService;
@@ -132,8 +130,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
             );
 
         internalConnectionHandler = new StandardInternalConnectionHandler();
-        listeners = new ListenerContainer<CuratorListener>();
-        unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
+        listeners = StandardListenerManager.standard();
+        unhandledErrorListeners = StandardListenerManager.standard();
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
         forcedSleepOperations = new LinkedBlockingQueue<>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
@@ -146,7 +144,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
         connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
         schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
-        zk34CompatibilityMode = builder.isZk34CompatibilityMode();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -156,7 +153,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
 
-        ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());
+        ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
 
         runSafeService = makeRunSafeService(builder);
     }
@@ -254,7 +251,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateErrorPolicy = parent.connectionStateErrorPolicy;
         internalConnectionHandler = parent.internalConnectionHandler;
         schemaSet = parent.schemaSet;
-        zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
     }
@@ -368,22 +364,17 @@ public class CuratorFrameworkImpl implements CuratorFramework
         log.debug("Closing");
         if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
         {
-            listeners.forEach(new Function<CuratorListener, Void>()
+            listeners.forEach(listener ->
             {
-                @Override
-                public Void apply(CuratorListener listener)
+                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
+                try
                 {
-                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
-                    try
-                    {
-                        listener.eventReceived(CuratorFrameworkImpl.this, event);
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        log.error("Exception while sending Closing event", e);
-                    }
-                    return null;
+                    listener.eventReceived(CuratorFrameworkImpl.this, event);
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Exception while sending Closing event", e);
                 }
             });
 
@@ -498,14 +489,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
     @Override
     public ReconfigBuilder reconfig()
     {
-        Preconditions.checkState(!isZk34CompatibilityMode(), "reconfig/config APIs are not support when running in ZooKeeper 3.4 compatibility mode");
         return new ReconfigBuilderImpl(this);
     }
 
     @Override
     public GetConfigBuilder getConfig()
     {
-        Preconditions.checkState(!isZk34CompatibilityMode(), "reconfig/config APIs are not support when running in ZooKeeper 3.4 compatibility mode");
         return new GetConfigBuilderImpl(this);
     }
 
@@ -567,7 +556,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     @Override
     public RemoveWatchesBuilder watches()
     {
-        Preconditions.checkState(!isZk34CompatibilityMode(), "Remove watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
         return new RemoveWatchesBuilderImpl(this);
     }
 
@@ -705,15 +693,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
 
         final String localReason = reason;
-        unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
-        {
-            @Override
-            public Void apply(UnhandledErrorListener listener)
-            {
-                listener.unhandledError(localReason, e);
-                return null;
-            }
-        });
+        unhandledErrorListeners.forEach(l -> l.unhandledError(localReason, e));
 
         if ( debugUnhandledErrorListener != null )
         {
@@ -824,12 +804,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateManager.addStateChange(newConnectionState);
     }
 
-    @Override
-    public boolean isZk34CompatibilityMode()
-    {
-        return zk34CompatibilityMode;
-    }
-
     EnsembleTracker getEnsembleTracker()
     {
         return ensembleTracker;
@@ -1037,23 +1011,18 @@ public class CuratorFrameworkImpl implements CuratorFramework
             validateConnection(curatorEvent.getWatchedEvent().getState());
         }
 
-        listeners.forEach(new Function<CuratorListener, Void>()
+        listeners.forEach(listener ->
         {
-            @Override
-            public Void apply(CuratorListener listener)
+            try
             {
-                try
-                {
-                    OperationTrace trace = client.startAdvancedTracer("EventListener");
-                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
-                    trace.commit();
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    logError("Event listener threw exception", e);
-                }
-                return null;
+                OperationTrace trace = client.startAdvancedTracer("EventListener");
+                listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
+                trace.commit();
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                logError("Event listener threw exception", e);
             }
         });
     }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
index 9057934..bdab158 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
@@ -35,7 +35,6 @@ import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
 import org.apache.curator.framework.schema.Schema;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.proto.CreateRequest;
@@ -136,22 +135,7 @@ public class CuratorMultiTransactionImpl implements
             if ( (curatorOp.get().getType() == ZooDefs.OpCode.create) || (curatorOp.get().getType() == ZooDefs.OpCode.createContainer) )
             {
                 CreateRequest createRequest = (CreateRequest)curatorOp.get().toRequestRecord();
-                CreateMode createMode;
-                if ( client.isZk34CompatibilityMode() )
-                {
-                    try
-                    {
-                        createMode = CreateMode.fromFlag(createRequest.getFlags());
-                    }
-                    catch ( KeeperException.BadArgumentsException dummy )
-                    {
-                        createMode = CreateMode.PERSISTENT;
-                    }
-                }
-                else
-                {
-                    createMode = CreateMode.fromFlag(createRequest.getFlags(), CreateMode.PERSISTENT);
-                }
+                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags(), CreateMode.PERSISTENT);
                 schema.validateCreate(createMode, createRequest.getPath(), createRequest.getData(), createRequest.getAcl());
             }
             else if ( (curatorOp.get().getType() == ZooDefs.OpCode.delete) || (curatorOp.get().getType() == ZooDefs.OpCode.deleteContainer) )
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index bdb5428..b9c9044 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@ -53,16 +53,11 @@ public class WatcherRemovalManager
 
     void removeWatchers()
     {
-        if ( client.isZk34CompatibilityMode() )
-        {
-            return;
-        }
-
         List<NamespaceWatcher> localEntries = Lists.newArrayList(entries);
         while ( localEntries.size() > 0 )
         {
             NamespaceWatcher watcher = localEntries.remove(0);
-            if ( entries.remove(watcher) && !client.isZk34CompatibilityMode() )
+            if ( entries.remove(watcher) )
             {
                 try
                 {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
deleted file mode 100644
index 9139439..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.listen;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Abstracts an object that has listeners
- *
- * @deprecated Prefer {@link MappingListenerManager} and
- * {@link StandardListenerManager}
- */
-@Deprecated
-public class ListenerContainer<T> implements Listenable<T>
-{
-    private final Logger                        log = LoggerFactory.getLogger(getClass());
-    private final Map<T, ListenerEntry<T>>      listeners = Maps.newConcurrentMap();
-
-    @Override
-    public void addListener(T listener)
-    {
-        addListener(listener, MoreExecutors.directExecutor());
-    }
-
-    @Override
-    public void addListener(T listener, Executor executor)
-    {
-        listeners.put(listener, new ListenerEntry<T>(listener, executor));
-    }
-
-    @Override
-    public void removeListener(T listener)
-    {
-        if ( listener != null )
-        {
-            listeners.remove(listener);
-        }
-    }
-
-    /**
-     * Remove all listeners
-     */
-    public void     clear()
-    {
-        listeners.clear();
-    }
-
-    /**
-     * Return the number of listeners
-     *
-     * @return number
-     */
-    public int      size()
-    {
-        return listeners.size();
-    }
-
-    /**
-     * Utility - apply the given function to each listener. The function receives
-     * the listener as an argument.
-     *
-     * @param function function to call for each listener
-     */
-    public void     forEach(final Function<T, Void> function)
-    {
-        for ( final ListenerEntry<T> entry : listeners.values() )
-        {
-            entry.executor.execute
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        try
-                        {
-                            function.apply(entry.listener);
-                        }
-                        catch ( Throwable e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
-                        }
-                    }
-                }
-            );
-        }
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
index bd9f51a..5a6c9c7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
@@ -28,7 +28,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
- * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
+ * Upgraded version of ListenerContainer that
  * doesn't leak Guava's internals and also supports mapping/wrapping of listeners
  */
 public class MappingListenerManager<K, V> implements ListenerManager<K, V>
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 46325d2..7285431 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.UnaryListenerManager;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -308,7 +307,7 @@ public class ConnectionStateManager implements Closeable
                 log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs));
                 try
                 {
-                    Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+                    client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
                 }
                 catch ( Exception e )
                 {
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
index d80e053..b4791ff 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.WatchersDebug;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.ZooKeeper;
 import java.util.concurrent.Callable;
 
@@ -35,12 +34,6 @@ public class TestCleanState
             return;
         }
 
-        if ( Compatibility.isZK34() )
-        {
-            CloseableUtils.closeQuietly(client);
-            return;
-        }
-
         try
         {
             Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
index 034791d..10b237f 100755
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -34,7 +33,6 @@ import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestCreateReturningStat extends CuratorTestBase
 {
     private CuratorFramework createClient()
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index 773d9c9..1656351 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -127,7 +126,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
     {
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
 
-        Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+        client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
         Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..8b22812 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -32,7 +32,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ZKPaths;
@@ -77,7 +76,6 @@ public class TestFramework extends BaseClassForTests
         super.teardown();
     }
 
-    @Test(groups = Zk35MethodInterceptor.zk35Group)
     public void testWaitForShutdownTimeoutMs() throws Exception
     {
         final BlockingQueue<Integer> timeoutQueue = new ArrayBlockingQueue<>(1);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 7c6d156..c091fad 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -24,7 +24,6 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.RetrySleeper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.SafeIsTtlMode;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -37,16 +36,15 @@ import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -63,7 +61,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestFrameworkEdges extends BaseClassForTests
@@ -92,7 +89,7 @@ public class TestFrameworkEdges extends BaseClassForTests
                 }
             };
             client.checkExists().usingWatcher(watcher).forPath("/foobar");
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.awaitLatch(expiredLatch));
         }
     }
@@ -327,7 +324,7 @@ public class TestFrameworkEdges extends BaseClassForTests
             final String TEST_PATH = "/a/b/c/test-";
             long ttl = timing.forWaiting().milliseconds() * 1000;
             CreateBuilder firstCreateBuilder = client.create();
-            if ( SafeIsTtlMode.isTtl(mode) )
+            if ( mode.isTTL() )
             {
                 firstCreateBuilder.withTtl(ttl);
             }
@@ -343,7 +340,7 @@ public class TestFrameworkEdges extends BaseClassForTests
 
             CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
             createBuilder.withProtection();
-            if ( SafeIsTtlMode.isTtl(mode) )
+            if ( mode.isTTL() )
             {
                 createBuilder.withTtl(ttl);
             }
@@ -532,7 +529,7 @@ public class TestFrameworkEdges extends BaseClassForTests
             };
 
             client.checkExists().usingWatcher(watcher).forPath("/sessionTest");
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.awaitLatch(sessionDiedLatch));
             Assert.assertNotNull(client.checkExists().forPath("/sessionTest"));
         }
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index c6ff2bb..1ff2805 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -32,7 +32,6 @@ import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingZooKeeperServer;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -57,7 +56,6 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestReconfiguration extends CuratorTestBase
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 63d8931..82f2cf4 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -32,7 +32,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -45,7 +44,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestRemoveWatches extends CuratorTestBase
 {
     private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client)
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
index e2156df..fc37e41 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
@@ -25,7 +25,6 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -34,7 +33,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestTtlNodes extends CuratorTestBase
 {
     @BeforeClass
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
index 74aac1d..8b17576 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -37,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestWatcherRemovalManager extends CuratorTestBase
 {
     @Test
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index e7778ff..c654f4f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -19,7 +19,6 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
@@ -27,7 +26,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -62,7 +62,7 @@ public class NodeCache implements Closeable
     private final boolean dataIsCompressed;
     private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
+    private final StandardListenerManager<NodeCacheListener> listeners = StandardListenerManager.standard();
     private final AtomicBoolean isConnected = new AtomicBoolean(true);
     private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
@@ -204,7 +204,7 @@ public class NodeCache implements Closeable
      *
      * @return listenable
      */
-    public ListenerContainer<NodeCacheListener> getListenable()
+    public Listenable<NodeCacheListener> getListenable()
     {
         Preconditions.checkState(state.get() != State.CLOSED, "Closed");
 
@@ -314,26 +314,17 @@ public class NodeCache implements Closeable
         ChildData   previousData = data.getAndSet(newData);
         if ( !Objects.equal(previousData, newData) )
         {
-            listeners.forEach
-            (
-                new Function<NodeCacheListener, Void>()
+            listeners.forEach(listener -> {
+                try
                 {
-                    @Override
-                    public Void apply(NodeCacheListener listener)
-                    {
-                        try
-                        {
-                            listener.nodeChanged();
-                        }
-                        catch ( Exception e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            log.error("Calling listener", e);
-                        }
-                        return null;
-                    }
+                    listener.nodeChanged();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Calling listener", e);
                 }
-            );
+            });
 
             if ( rebuildTestExchanger != null )
             {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..14c889a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
@@ -31,7 +30,8 @@ import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
@@ -74,7 +74,7 @@ public class PathChildrenCache implements Closeable
     private final CloseableExecutorService executorService;
     private final boolean cacheData;
     private final boolean dataIsCompressed;
-    private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
+    private final StandardListenerManager<PathChildrenCacheListener> listeners = StandardListenerManager.standard();
     private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
     private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
     private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
@@ -394,7 +394,7 @@ public class PathChildrenCache implements Closeable
      *
      * @return listenable
      */
-    public ListenerContainer<PathChildrenCacheListener> getListenable()
+    public Listenable<PathChildrenCacheListener> getListenable()
     {
         return listeners;
     }
@@ -526,26 +526,17 @@ public class PathChildrenCache implements Closeable
 
     void callListeners(final PathChildrenCacheEvent event)
     {
-        listeners.forEach
-            (
-                new Function<PathChildrenCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(PathChildrenCacheListener listener)
-                    {
-                        try
-                        {
-                            listener.childEvent(client, event);
-                        }
-                        catch ( Exception e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            handleException(e);
-                        }
-                        return null;
-                    }
-                }
-            );
+        listeners.forEach(listener -> {
+            try
+            {
+                listener.childEvent(client, event);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                handleException(e);
+            }
+        });
     }
 
     void getDataAndStat(final String fullPath) throws Exception
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..ad14dab 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -33,7 +32,7 @@ import org.apache.curator.framework.api.Pathable;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.api.Watchable;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -531,8 +530,8 @@ public class TreeCache implements Closeable
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final int maxDepth;
-    private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
-    private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>();
+    private final StandardListenerManager<TreeCacheListener> listeners = StandardListenerManager.standard();
+    private final StandardListenerManager<UnhandledErrorListener> errorListeners = StandardListenerManager.standard();
     private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
 
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
@@ -750,21 +749,16 @@ public class TreeCache implements Closeable
 
     private void callListeners(final TreeCacheEvent event)
     {
-        listeners.forEach(new Function<TreeCacheListener, Void>()
+        listeners.forEach(listener ->
         {
-            @Override
-            public Void apply(TreeCacheListener listener)
+            try
             {
-                try
-                {
-                    listener.childEvent(client, event);
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    handleException(e);
-                }
-                return null;
+                listener.childEvent(client, event);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                handleException(e);
             }
         });
     }
@@ -780,21 +774,16 @@ public class TreeCache implements Closeable
         }
         else
         {
-            errorListeners.forEach(new Function<UnhandledErrorListener, Void>()
+            errorListeners.forEach(listener ->
             {
-                @Override
-                public Void apply(UnhandledErrorListener listener)
+                try
                 {
-                    try
-                    {
-                        listener.unhandledError("", e);
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        LOG.error("Exception handling exception", e);
-                    }
-                    return null;
+                    listener.unhandledError("", e);
+                }
+                catch ( Exception e2 )
+                {
+                    ThreadUtils.checkInterrupted(e2);
+                    LOG.error("Exception handling exception", e2);
                 }
             });
         }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 79b2601..7d9ca3c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -20,13 +20,12 @@
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.AfterConnectionEstablished;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
@@ -71,7 +70,7 @@ public class LeaderLatch implements Closeable
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
-    private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+    private final StandardListenerManager<LeaderLatchListener> listeners = StandardListenerManager.standard();
     private final CloseMode closeMode;
     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
 
@@ -682,27 +681,11 @@ public class LeaderLatch implements Closeable
 
         if ( oldValue && !newValue )
         { // Lost leadership, was true, now false
-            listeners.forEach(new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener listener)
-                    {
-                        listener.notLeader();
-                        return null;
-                    }
-                });
+            listeners.forEach(LeaderLatchListener::notLeader);
         }
         else if ( !oldValue && newValue )
         { // Gained leadership, was false, now true
-            listeners.forEach(new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener input)
-                    {
-                        input.isLeader();
-                        return null;
-                    }
-                });
+            listeners.forEach(LeaderLatchListener::isLeader);
         }
 
         notifyAll();
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
deleted file mode 100644
index 3c8b56a..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ /dev/null
@@ -1,307 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.utils.CloseableScheduledExecutorService;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.PathUtils;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on
- * the node and adds empty nodes to an internally managed {@link Reaper}
- *
- * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
- * Also, all Curator recipes create container parents.
- */
-@Deprecated
-public class ChildReaper implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final Reaper reaper;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final CuratorFramework client;
-    private final Collection<String> paths = Sets.newConcurrentHashSet();
-    private volatile Iterator<String> pathIterator = null;
-    private final Reaper.Mode mode;
-    private final CloseableScheduledExecutorService executor;
-    private final int reapingThresholdMs;
-    private final LeaderLatch leaderLatch;
-    private final Set<String> lockSchema;
-    private final AtomicInteger maxChildren = new AtomicInteger(-1);
-
-    private volatile Future<?> task;
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param mode reaping mode
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode)
-    {
-        this(client, path, mode, newExecutorService(), Reaper.DEFAULT_REAPING_THRESHOLD_MS, null);
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, int reapingThresholdMs)
-    {
-        this(client, path, mode, newExecutorService(), reapingThresholdMs, null);
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param executor executor to use for background tasks
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs)
-    {
-        this(client, path, mode, executor, reapingThresholdMs, null);
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param executor executor to use for background tasks
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
-    {
-        this(client, path, mode, executor, reapingThresholdMs, leaderPath, Collections.<String>emptySet());
-    }
-
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param executor executor to use for background tasks
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
-     * @param lockSchema a set of the possible subnodes of the children of path that must be reaped in addition to the child nodes
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, Set<String> lockSchema)
-    {
-        this.client = client;
-        this.mode = mode;
-        this.executor = new CloseableScheduledExecutorService(executor);
-        this.reapingThresholdMs = reapingThresholdMs;
-        if (leaderPath != null)
-        {
-            leaderLatch = new LeaderLatch(client, leaderPath);
-        }
-        else
-        {
-            leaderLatch = null;
-        }
-        this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
-        this.lockSchema = lockSchema;
-        addPath(path);
-    }
-
-    /**
-     * The reaper must be started
-     *
-     * @throws Exception errors
-     */
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
-        task = executor.scheduleWithFixedDelay
-        (
-            new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    doWork();
-                }
-            },
-            reapingThresholdMs,
-            reapingThresholdMs,
-            TimeUnit.MILLISECONDS
-        );
-        if (leaderLatch != null)
-        {
-            leaderLatch.start();
-        }
-        reaper.start();
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            CloseableUtils.closeQuietly(reaper);
-            if (leaderLatch != null)
-            {
-                CloseableUtils.closeQuietly(leaderLatch);
-            }
-            task.cancel(true);
-        }
-    }
-
-    /**
-     * Add a path to reap children from
-     *
-     * @param path the path
-     * @return this for chaining
-     */
-    public ChildReaper addPath(String path)
-    {
-        paths.add(PathUtils.validatePath(path));
-        return this;
-    }
-
-    /**
-     * Remove a path from reaping
-     *
-     * @param path the path
-     * @return true if the path existed and was removed
-     */
-    public boolean removePath(String path)
-    {
-        return paths.remove(PathUtils.validatePath(path));
-    }
-
-    /**
-     * If a node has so many children that {@link CuratorFramework#getChildren()} will fail
-     * (due to jute.maxbuffer) it can cause connection instability. Set the max number of
-     * children here to prevent the path from being queried in these cases. The number should usually
-     * be: average-node-name-length/1000000
-     *
-     * @param maxChildren max children
-     */
-    public void setMaxChildren(int maxChildren)
-    {
-        this.maxChildren.set(maxChildren);
-    }
-
-    public static ScheduledExecutorService newExecutorService()
-    {
-        return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
-    }
-
-    @VisibleForTesting
-    protected void warnMaxChildren(String path, Stat stat)
-    {
-        log.warn(String.format("Skipping %s as it has too many children: %d", path, stat.getNumChildren()));
-    }
-
-    private void doWork()
-    {
-        if ( shouldDoWork() )
-        {
-            if ( (pathIterator == null) || !pathIterator.hasNext() )
-            {
-                pathIterator = paths.iterator();
-            }
-            while ( pathIterator.hasNext() )
-            {
-                String path = pathIterator.next();
-                try
-                {
-                    int maxChildren = this.maxChildren.get();
-                    if ( maxChildren > 0 )
-                    {
-                        Stat stat = client.checkExists().forPath(path);
-                        if ( (stat != null) && (stat.getNumChildren() > maxChildren) )
-                        {
-                            warnMaxChildren(path, stat);
-                            continue;
-                        }
-                    }
-
-                    List<String> children = client.getChildren().forPath(path);
-                    log.info(String.format("Found %d children for %s", children.size(), path));
-                    for ( String name : children )
-                    {
-                        String childPath = ZKPaths.makePath(path, name);
-                        addPathToReaperIfEmpty(childPath);
-                        for ( String subNode : lockSchema )
-                        {
-                            addPathToReaperIfEmpty(ZKPaths.makePath(childPath, subNode));
-                        }
-
-                    }
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    log.error("Could not get children for path: " + path, e);
-                }
-            }
-        }
-    }
-
-    private void addPathToReaperIfEmpty(String path) throws Exception
-    {
-        Stat stat = client.checkExists().forPath(path);
-        if ( (stat != null) && (stat.getNumChildren() == 0) )
-        {
-            log.info("Adding " + path);
-            reaper.addPath(path, mode);
-        }
-    }
-
-    private boolean shouldDoWork()
-    {
-        return this.leaderLatch == null || this.leaderLatch.hasLeadership();
-    }
-}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
deleted file mode 100644
index 2522154..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.curator.utils.CloseableScheduledExecutorService;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Utility to clean up parent lock nodes so that they don't stay around as garbage
- *
- * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
- * Also, all Curator recipes create container parents.
- */
-@Deprecated
-public class Reaper implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final CloseableScheduledExecutorService executor;
-    private final int reapingThresholdMs;
-    private final Map<String, PathHolder> activePaths = Maps.newConcurrentMap();
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final LeaderLatch leaderLatch;
-    private final AtomicBoolean reapingIsActive = new AtomicBoolean(true);
-    private final boolean ownsLeaderLatch;
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    static final int DEFAULT_REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
-
-    @VisibleForTesting
-    static final int EMPTY_COUNT_THRESHOLD = 3;
-
-    @VisibleForTesting
-    class PathHolder implements Runnable
-    {
-        final String path;
-        final Mode mode;
-        final int emptyCount;
-
-        @Override
-        public void run()
-        {
-            reap(this);
-        }
-
-        private PathHolder(String path, Mode mode, int emptyCount)
-        {
-            this.path = path;
-            this.mode = mode;
-            this.emptyCount = emptyCount;
-        }
-    }
-
-    public enum Mode
-    {
-        /**
-         * Reap forever, or until removePath is called for the path
-         */
-        REAP_INDEFINITELY,
-
-        /**
-         * Reap until the Reaper succeeds in deleting the path
-         */
-        REAP_UNTIL_DELETE,
-
-        /**
-         * Reap until the path no longer exists
-         */
-        REAP_UNTIL_GONE
-    }
-
-    /**
-     * Uses the default reaping threshold of 5 minutes and creates an internal thread pool
-     *
-     * @param client client
-     */
-    public Reaper(CuratorFramework client)
-    {
-        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, (String) null);
-    }
-
-    /**
-     * Uses the given reaping threshold and creates an internal thread pool
-     *
-     * @param client             client
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     */
-    public Reaper(CuratorFramework client, int reapingThresholdMs)
-    {
-        this(client, newExecutorService(), reapingThresholdMs, (String) null);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     */
-    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
-    {
-        this(client, executor, reapingThresholdMs, (String) null);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param leaderPath         if not null, uses a leader selection so that only 1 reaper is active in the cluster
-     */
-    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
-    {
-        this(client, executor, reapingThresholdMs, makeLeaderLatchIfPathNotNull(client, leaderPath), true);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param leaderLatch        a pre-created leader latch to ensure only 1 reaper is active in the cluster
-     */
-    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch)
-    {
-        this(client, executor, reapingThresholdMs, leaderLatch, false);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param leaderLatch        a pre-created leader latch to ensure only 1 reaper is active in the cluster
-     * @param ownsLeaderLatch    indicates whether or not the reaper owns the leader latch (if it exists) and thus should start/stop it
-     * */
-    private Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch)
-    {
-        this.client = client;
-        this.executor = new CloseableScheduledExecutorService(executor);
-        this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
-        this.leaderLatch = leaderLatch;
-        if (leaderLatch != null)
-        {
-            addListenerToLeaderLatch(leaderLatch);
-        }
-        this.ownsLeaderLatch = ownsLeaderLatch;
-    }
-
-
-    /**
-     * Add a path (using Mode.REAP_INDEFINITELY) to be checked by the reaper. The path will be checked periodically
-     * until the reaper is closed.
-     *
-     * @param path path to check
-     */
-    public void addPath(String path)
-    {
-        addPath(path, Mode.REAP_INDEFINITELY);
-    }
-
-    /**
-     * Add a path to be checked by the reaper. The path will be checked periodically
-     * until the reaper is closed, or until the point specified by the Mode
-     *
-     * @param path path to check
-     * @param mode reaping mode
-     */
-    public void addPath(String path, Mode mode)
-    {
-        PathHolder pathHolder = new PathHolder(path, mode, 0);
-        activePaths.put(path, pathHolder);
-        schedule(pathHolder, reapingThresholdMs);
-    }
-
-    /**
-     * Stop reaping the given path
-     *
-     * @param path path to remove
-     * @return true if the path was removed
-     */
-    public boolean removePath(String path)
-    {
-        return activePaths.remove(path) != null;
-    }
-
-    /**
-     * The reaper must be started
-     *
-     * @throws Exception errors
-     */
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
-        if ( leaderLatch != null && ownsLeaderLatch)
-        {
-            leaderLatch.start();
-        }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            executor.close();
-            if ( leaderLatch != null && ownsLeaderLatch )
-            {
-                leaderLatch.close();
-            }
-        }
-    }
-
-    @VisibleForTesting
-    protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
-    {
-        if ( reapingIsActive.get() )
-        {
-            return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
-        }
-        return null;
-    }
-
-    @VisibleForTesting
-    protected void reap(PathHolder holder)
-    {
-        if ( !activePaths.containsKey(holder.path) )
-        {
-            return;
-        }
-
-        boolean addBack = true;
-        int newEmptyCount = 0;
-        try
-        {
-            Stat stat = client.checkExists().forPath(holder.path);
-            if ( stat != null ) // otherwise already deleted
-            {
-                if ( stat.getNumChildren() == 0 )
-                {
-                    if ( (holder.emptyCount + 1) >= EMPTY_COUNT_THRESHOLD )
-                    {
-                        try
-                        {
-                            client.delete().forPath(holder.path);
-                            log.info("Reaping path: " + holder.path);
-                            if ( holder.mode == Mode.REAP_UNTIL_DELETE || holder.mode == Mode.REAP_UNTIL_GONE )
-                            {
-                                addBack = false;
-                            }
-                        }
-                        catch ( KeeperException.NoNodeException ignore )
-                        {
-                            // Node must have been deleted by another process/thread
-                            if ( holder.mode == Mode.REAP_UNTIL_GONE )
-                            {
-                                addBack = false;
-                            }
-                        }
-                        catch ( KeeperException.NotEmptyException ignore )
-                        {
-                            // ignore - it must have been re-used
-                        }
-                    }
-                    else
-                    {
-                        newEmptyCount = holder.emptyCount + 1;
-                    }
-                }
-            }
-            else
-            {
-                if ( holder.mode == Mode.REAP_UNTIL_GONE )
-                {
-                    addBack = false;
-                }
-            }
-        }
-        catch ( Exception e )
-        {
-            ThreadUtils.checkInterrupted(e);
-            log.error("Trying to reap: " + holder.path, e);
-        }
-
-        if ( !addBack )
-        {
-            activePaths.remove(holder.path);
-        }
-        else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.containsKey(holder.path) )
-        {
-            activePaths.put(holder.path, holder);
-            schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
-        }
-    }
-
-    /**
-     * Allocate an executor service for the reaper
-     *
-     * @return service
-     */
-    public static ScheduledExecutorService newExecutorService()
-    {
-        return ThreadUtils.newSingleThreadScheduledExecutor("Reaper");
-    }
-
-    private void addListenerToLeaderLatch(LeaderLatch leaderLatch)
-    {
-
-        LeaderLatchListener listener = new LeaderLatchListener()
-        {
-            @Override
-            public void isLeader()
-            {
-                reapingIsActive.set(true);
-                for ( PathHolder holder : activePaths.values() )
-                {
-                    schedule(holder, reapingThresholdMs);
-                }
-            }
-
-            @Override
-            public void notLeader()
-            {
-                reapingIsActive.set(false);
-            }
-        };
-        leaderLatch.addListener(listener);
-
-        reapingIsActive.set(leaderLatch.hasLeadership());
-    }
-
-    private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath)
-    {
-        if (leaderPath == null)
-        {
-            return null;
-        }
-        else
-        {
-            return new LeaderLatch(client, leaderPath);
-        }
-    }
-}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 81e8dd9..f7b4653 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -19,11 +19,9 @@
 
 package org.apache.curator.framework.recipes.nodes;
 
-import com.google.common.base.Function;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.SafeIsTtlMode;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -32,7 +30,8 @@ import org.apache.curator.framework.api.CreateBuilderMain;
 import org.apache.curator.framework.api.CreateModable;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -77,7 +76,7 @@ public class PersistentNode implements Closeable
     private final BackgroundCallback backgroundCallback;
     private final boolean useProtection;
     private final AtomicReference<CreateModable<ACLBackgroundPathAndBytesable<String>>> createMethod = new AtomicReference<CreateModable<ACLBackgroundPathAndBytesable<String>>>(null);
-    private final ListenerContainer<PersistentNodeListener> listeners = new ListenerContainer<PersistentNodeListener>();
+    private final StandardListenerManager<PersistentNodeListener> listeners = StandardListenerManager.standard();
     private final CuratorWatcher watcher = new CuratorWatcher()
     {
         @Override
@@ -362,7 +361,7 @@ public class PersistentNode implements Closeable
      *
      * @return listenable
      */
-    public ListenerContainer<PersistentNodeListener> getListenable()
+    public Listenable<PersistentNodeListener> getListenable()
     {
         return listeners;
     }
@@ -462,7 +461,7 @@ public class PersistentNode implements Closeable
             CreateModable<ACLBackgroundPathAndBytesable<String>> localCreateMethod = createMethod.get();
             if ( localCreateMethod == null )
             {
-                CreateBuilderMain createBuilder = SafeIsTtlMode.isTtl(mode) ? client.create().withTtl(ttl) : client.create();
+                CreateBuilderMain createBuilder = mode.isTTL() ? client.create().withTtl(ttl) : client.create();
                 CreateModable<ACLBackgroundPathAndBytesable<String>> tempCreateMethod = useProtection ? createBuilder.creatingParentContainersIfNeeded().withProtection() : createBuilder.creatingParentContainersIfNeeded();
                 createMethod.compareAndSet(null, tempCreateMethod);
                 localCreateMethod = createMethod.get();
@@ -523,25 +522,17 @@ public class PersistentNode implements Closeable
     private void notifyListeners()
     {
         final String path = getActualPath();
-        listeners.forEach(
-             new Function<PersistentNodeListener, Void>()
-             {
-                 @Override
-                 public Void apply(PersistentNodeListener listener)
-                 {
-                     try
-                    {
-                        listener.nodeCreated(path);
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        log.error("From PersistentNode listener", e);
-                    }
-                    return null;
-                }
-             }
-        );
+        listeners.forEach(listener -> {
+            try
+            {
+                listener.nodeCreated(path);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.error("From PersistentNode listener", e);
+            }
+        });
     }
 
     private boolean isActive()
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
index 8f321b3..5327a09 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
@@ -215,7 +215,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
      * @return put listener container
      */
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return queue.getPutListenerContainer();
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
index 15045aa..45b84c4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.IOException;
@@ -105,7 +105,7 @@ public class DistributedIdQueue<T> implements QueueBase<T>
     }
 
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return queue.getPutListenerContainer();
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
index 4b70ec5..be61de2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.Executor;
@@ -176,7 +176,7 @@ public class DistributedPriorityQueue<T> implements Closeable, QueueBase<T>
      * @return put listener container
      */
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return queue.getPutListenerContainer();
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
index 14d1266..0df2930 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
@@ -19,18 +19,18 @@
 package org.apache.curator.framework.recipes.queue;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
@@ -50,7 +50,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * <p>An implementation of the Distributed Queue ZK recipe. Items put into the queue
@@ -81,7 +80,7 @@ public class DistributedQueue<T> implements QueueBase<T>
     private final boolean isProducerOnly;
     private final String lockPath;
     private final AtomicReference<ErrorMode> errorMode = new AtomicReference<ErrorMode>(ErrorMode.REQUEUE);
-    private final ListenerContainer<QueuePutListener<T>> putListenerContainer = new ListenerContainer<QueuePutListener<T>>();
+    private final StandardListenerManager<QueuePutListener<T>> putListenerContainer = StandardListenerManager.standard();
     private final AtomicInteger lastChildCount = new AtomicInteger(0);
     private final int maxItems;
     private final int finalFlushMs;
@@ -233,7 +232,7 @@ public class DistributedQueue<T> implements QueueBase<T>
      * @return put listener container
      */
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return putListenerContainer;
     }
@@ -407,25 +406,16 @@ public class DistributedQueue<T> implements QueueBase<T>
             putCount.decrementAndGet();
             putCount.notifyAll();
         }
-        putListenerContainer.forEach
-        (
-            new Function<QueuePutListener<T>, Void>()
+        putListenerContainer.forEach(listener -> {
+            if ( item != null )
             {
-                @Override
-                public Void apply(QueuePutListener<T> listener)
-                {
-                    if ( item != null )
-                    {
-                        listener.putCompleted(item);
-                    }
-                    else
-                    {
-                        listener.putMultiCompleted(givenMultiItem);
-                    }
-                    return null;
-                }
+                listener.putCompleted(item);
             }
-        );
+            else
+            {
+                listener.putMultiCompleted(givenMultiItem);
+            }
+        });
     }
 
     private void doPutInBackground(final T item, String path, final MultiItem<T> givenMultiItem, byte[] bytes) throws Exception
@@ -449,25 +439,16 @@ public class DistributedQueue<T> implements QueueBase<T>
                     }
                 }
 
-                putListenerContainer.forEach
-                (
-                    new Function<QueuePutListener<T>, Void>()
+                putListenerContainer.forEach(listener -> {
+                    if ( item != null )
                     {
-                        @Override
-                        public Void apply(QueuePutListener<T> listener)
-                        {
-                            if ( item != null )
-                            {
-                                listener.putCompleted(item);
-                            }
-                            else
-                            {
-                                listener.putMultiCompleted(givenMultiItem);
-                            }
-                            return null;
-                        }
+                        listener.putCompleted(item);
                     }
-                );
+                    else
+                    {
+                        listener.putMultiCompleted(givenMultiItem);
+                    }
+                });
             }
         };
         internalCreateNode(path, bytes, callback);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
index 0a21ce8..f9fee13 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
@@ -18,7 +18,7 @@
  */
 package org.apache.curator.framework.recipes.queue;
 
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
@@ -36,7 +36,7 @@ public interface QueueBase<T> extends Closeable
      *
      * @return put listener container
      */
-    ListenerContainer<QueuePutListener<T>> getPutListenerContainer();
+    Listenable<QueuePutListener<T>> getPutListenerContainer();
 
     /**
      * Used when the queue is created with a {@link QueueBuilder#lockPath(String)}. Determines
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 5d7abce..d605234 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -20,14 +20,14 @@
 package org.apache.curator.framework.recipes.shared;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -52,7 +52,7 @@ public class SharedValue implements Closeable, SharedValueReader
 	private static final int UNINITIALIZED_VERSION = -1;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>();
+    private final StandardListenerManager<SharedValueListener> listeners = StandardListenerManager.standard();
     private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final byte[] seedValue;
@@ -235,7 +235,7 @@ public class SharedValue implements Closeable, SharedValueReader
      *
      * @return listenable
      */
-    public ListenerContainer<SharedValueListener> getListenable()
+    public Listenable<SharedValueListener> getListenable()
     {
         return listeners;
     }
@@ -297,41 +297,21 @@ public class SharedValue implements Closeable, SharedValueReader
     private void notifyListeners()
     {
         final byte[] localValue = getValue();
-        listeners.forEach
-            (
-                new Function<SharedValueListener, Void>()
-                {
-                    @Override
-                    public Void apply(SharedValueListener listener)
-                    {
-                        try
-                        {
-                            listener.valueHasChanged(SharedValue.this, localValue);
-                        }
-                        catch ( Exception e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            log.error("From SharedValue listener", e);
-                        }
-                        return null;
-                    }
-                }
-            );
+        listeners.forEach(listener -> {
+            try
+            {
+                listener.valueHasChanged(SharedValue.this, localValue);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.error("From SharedValue listener", e);
+            }
+        });
     }
 
     private void notifyListenerOfStateChanged(final ConnectionState newState)
     {
-        listeners.forEach
-            (
-                new Function<SharedValueListener, Void>()
-                {
-                    @Override
-                    public Void apply(SharedValueListener listener)
-                    {
-                        listener.stateChanged(client, newState);
-                        return null;
-                    }
-                }
-            );
+        listeners.forEach(listener -> listener.stateChanged(client, newState));
     }
 }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
index e298cca..a846c54 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
@@ -18,7 +18,7 @@
  */
 package org.apache.curator.framework.recipes.shared;
 
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 
 /**
  * Abstracts a shared value and allows listening for changes to the value
@@ -44,5 +44,5 @@ public interface SharedValueReader
      *
      * @return listenable
      */
-    public ListenerContainer<SharedValueListener> getListenable();
+    public Listenable<SharedValueListener> getListenable();
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index ff416d5..705cad5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -27,7 +28,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.Callable;
@@ -196,7 +196,7 @@ public class TestNodeCache extends BaseClassForTests
                 }
             );
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Thread.sleep(timing.multiple(1.5).session());
 
             Assert.assertEquals(cache.getCurrentData().getData(), "start".getBytes());
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 78fabd5..3c65994 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -29,11 +29,11 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
@@ -814,7 +814,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
             Assert.assertTrue(timing.awaitLatch(childAddedLatch));
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.awaitLatch(lostLatch));
             Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
             Assert.assertTrue(timing.awaitLatch(removedLatch));
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 1e97ce2..7da3ecb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -23,8 +23,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -423,9 +423,9 @@ public class TestTreeCache extends BaseTestTreeCache
         client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
         assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/me");
 
-        Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
-        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes(), true);
+        client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
         assertEvent(TreeCacheEvent.Type.INITIALIZED, null, null, true);
+        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes(), true);
 
         assertNoMoreEvents();
     }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index b92c3a2..97a326e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -30,11 +30,11 @@ import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Arrays;
@@ -531,7 +531,7 @@ public class TestLeaderSelector extends BaseClassForTests
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             Assert.assertTrue(timing.awaitLatch(interruptedLatch));
             timing.sleepABit();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
deleted file mode 100644
index cafb9a3..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-
-public class TestChildReaper extends BaseClassForTests
-{
-    @Test
-    public void testMaxChildren() throws Exception
-    {
-        server.close();
-
-        final int LARGE_QTY = 10000;
-
-        System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
-        server = new TestingServer();
-        try
-        {
-            Timing timing = new Timing();
-            ChildReaper reaper = null;
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
-            try
-            {
-                client.start();
-
-                for ( int i = 0; i < LARGE_QTY; ++i )
-                {
-                    if ( (i % 1000) == 0 )
-                    {
-                        System.out.println(i);
-                    }
-                    client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
-                }
-
-                try
-                {
-                    client.getChildren().forPath("/big");
-                    Assert.fail("Should have been a connection loss");
-                }
-                catch ( KeeperException.ConnectionLossException e )
-                {
-                    // expected
-                }
-
-                final CountDownLatch latch = new CountDownLatch(1);
-                reaper = new ChildReaper(client, "/big", Reaper.Mode.REAP_UNTIL_DELETE, 1)
-                {
-                    @Override
-                    protected void warnMaxChildren(String path, Stat stat)
-                    {
-                        latch.countDown();
-                        super.warnMaxChildren(path, stat);
-                    }
-                };
-                reaper.setMaxChildren(100);
-                reaper.start();
-                Assert.assertTrue(timing.awaitLatch(latch));
-            }
-            finally
-            {
-                CloseableUtils.closeQuietly(reaper);
-                CloseableUtils.closeQuietly(client);
-            }
-        }
-        finally
-        {
-            System.clearProperty("jute.maxbuffer");
-        }
-    }
-
-    @Test
-    public void testLargeNodes() throws Exception
-    {
-        server.close();
-
-        final int LARGE_QTY = 10000;
-        final int SMALL_QTY = 100;
-
-        System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
-        server = new TestingServer();
-        try
-        {
-            Timing timing = new Timing();
-            ChildReaper reaper = null;
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
-            try
-            {
-                client.start();
-
-                for ( int i = 0; i < LARGE_QTY; ++i )
-                {
-                    if ( (i % 1000) == 0 )
-                    {
-                        System.out.println(i);
-                    }
-                    client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
-
-                    if ( i < SMALL_QTY )
-                    {
-                        client.create().creatingParentsIfNeeded().forPath("/small/node-" + i);
-                    }
-                }
-
-                reaper = new ChildReaper(client, "/foo", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-                reaper.start();
-
-                reaper.addPath("/big");
-                reaper.addPath("/small");
-
-                int count = -1;
-                for ( int i = 0; (i < 10) && (count != 0); ++i )
-                {
-                    timing.sleepABit();
-                    count = client.checkExists().forPath("/small").getNumChildren();
-                }
-                Assert.assertEquals(count, 0);
-            }
-            finally
-            {
-                CloseableUtils.closeQuietly(reaper);
-                CloseableUtils.closeQuietly(client);
-            }
-        }
-        finally
-        {
-            System.clearProperty("jute.maxbuffer");
-        }
-    }
-
-    @Test
-    public void testSomeNodes() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            Random r = new Random();
-            int nonEmptyNodes = 0;
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-                if ( r.nextBoolean() )
-                {
-                    client.create().forPath("/test/" + Integer.toString(i) + "/foo");
-                    ++nonEmptyNodes;
-                }
-            }
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testSimple() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-            }
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testLeaderElection() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        LeaderLatch otherLeader = null;
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-            }
-
-            otherLeader = new LeaderLatch(client, "/test-leader");
-            otherLeader.start();
-            otherLeader.await();
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader");
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            //Should not have reaped anything at this point since otherLeader is still leader
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 10);
-
-            CloseableUtils.closeQuietly(otherLeader);
-
-            timing.forWaiting().sleepABit();
-
-            stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            if ( otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED )
-            {
-                CloseableUtils.closeQuietly(otherLeader);
-            }
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testMultiPath() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test1/" + Integer.toString(i));
-                client.create().creatingParentsIfNeeded().forPath("/test2/" + Integer.toString(i));
-                client.create().creatingParentsIfNeeded().forPath("/test3/" + Integer.toString(i));
-            }
-
-            reaper = new ChildReaper(client, "/test2", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-            reaper.addPath("/test1");
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test1");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-            stat = client.checkExists().forPath("/test2");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-            stat = client.checkExists().forPath("/test3");
-            Assert.assertEquals(stat.getNumChildren(), 10);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testNamespace() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.builder()
-            .connectString(server.getConnectString())
-            .sessionTimeoutMs(timing.session())
-            .connectionTimeoutMs(timing.connection())
-            .retryPolicy(new RetryOneTime(1))
-            .namespace("foo")
-            .build();
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-            }
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-
-            stat = client.usingNamespace(null).checkExists().forPath("/foo/test");
-            Assert.assertNotNull(stat);
-            Assert.assertEquals(stat.getNumChildren(), 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index e9645e8..95b139d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -26,8 +26,8 @@ import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.schema.Schema;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -172,7 +172,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
             Assert.assertTrue(lock.isAcquiredInThisProcess());
 
             // Kill the session, check that lock node still exists
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertNotNull(client.checkExists().forPath(LOCK_PATH));
 
             // Release the lock and verify that the actual lock node created no longer exists
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 0c62650..afebf8a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -27,11 +27,11 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
@@ -200,7 +200,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
                 );
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.forSessionSleep().acquireSemaphore(semaphore, 1));
         }
         finally
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 50f6bce..ce9f8fa 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -664,53 +664,6 @@ public class TestInterProcessSemaphore extends BaseClassForTests
     }
 
     @Test
-    public void testChildReaperCleansUpLockNodes() throws Exception
-    {
-        Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-
-        ChildReaper childReaper = null;
-        try
-        {
-            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1);
-            semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
-
-            Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
-
-            childReaper = new ChildReaper(
-                client,
-                "/test",
-                Reaper.Mode.REAP_UNTIL_GONE,
-                ChildReaper.newExecutorService(),
-                1,
-                "/test-leader",
-                InterProcessSemaphoreV2.LOCK_SCHEMA
-            );
-            childReaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            try
-            {
-                List<String> children = client.getChildren().forPath("/test");
-
-                Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped");
-            }
-            catch ( KeeperException.NoNodeException ok )
-            {
-                // this is OK - if Container Nodes are used the "/test" path will go away - no point in updating the test for deprecated code
-            }
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(childReaper);
-            CloseableUtils.closeQuietly(client);
-        }
-
-    }
-
-    @Test
     public void testNoOrphanedNodes() throws Exception
     {
         final Timing timing = new Timing();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
deleted file mode 100644
index c47808f..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ /dev/null
@@ -1,647 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.recipes.locks;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestReaper extends BaseClassForTests
-{
-    @Test
-    public void testUsingLeaderPath() throws Exception
-    {
-        final Timing timing = new Timing();
-        CuratorFramework client = makeClient(timing, null);
-        Reaper reaper1 = null;
-        Reaper reaper2 = null;
-        try
-        {
-            final AtomicInteger reaper1Count = new AtomicInteger();
-            reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper1Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            final AtomicInteger reaper2Count = new AtomicInteger();
-            reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper2Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            reaper1.start();
-            reaper2.start();
-
-            reaper1.addPath("/one/two/three");
-            reaper2.addPath("/one/two/three");
-
-            timing.sleepABit();
-
-            Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
-            Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
-
-            Reaper activeReaper;
-            AtomicInteger inActiveReaperCount;
-            if ( reaper1Count.get() > 0 )
-            {
-                activeReaper = reaper1;
-                inActiveReaperCount = reaper2Count;
-            }
-            else
-            {
-                activeReaper = reaper2;
-                inActiveReaperCount = reaper1Count;
-            }
-            Assert.assertEquals(inActiveReaperCount.get(), 0);
-            activeReaper.close();
-            timing.sleepABit();
-            Assert.assertTrue(inActiveReaperCount.get() > 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper1);
-            CloseableUtils.closeQuietly(reaper2);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testUsingLeaderLatch() throws Exception
-    {
-        final Timing timing = new Timing();
-        CuratorFramework client = makeClient(timing, null);
-        Reaper reaper1 = null;
-        Reaper reaper2 = null;
-        LeaderLatch leaderLatch1 = null;
-        LeaderLatch leaderLatch2 = null;
-        try
-        {
-            final AtomicInteger reaper1Count = new AtomicInteger();
-            leaderLatch1 = new LeaderLatch(client, "/reaper/leader");
-            reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch1)
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper1Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            final AtomicInteger reaper2Count = new AtomicInteger();
-            leaderLatch2 = new LeaderLatch(client, "/reaper/leader");
-            reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch2)
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper2Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            leaderLatch1.start();
-            leaderLatch2.start();
-
-            reaper1.start();
-            reaper2.start();
-
-            reaper1.addPath("/one/two/three");
-            reaper2.addPath("/one/two/three");
-
-            timing.sleepABit();
-
-            Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
-            Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
-
-            Reaper activeReaper;
-            LeaderLatch activeLeaderLeatch;
-            AtomicInteger inActiveReaperCount;
-            if ( reaper1Count.get() > 0 )
-            {
-                activeReaper = reaper1;
-                activeLeaderLeatch = leaderLatch1;
-                inActiveReaperCount = reaper2Count;
-            }
-            else
-            {
-                activeReaper = reaper2;
-                activeLeaderLeatch = leaderLatch2;
-                inActiveReaperCount = reaper1Count;
-            }
-            Assert.assertEquals(inActiveReaperCount.get(), 0);
-            activeReaper.close();
-            activeLeaderLeatch.close();
-            timing.sleepABit();
-            Assert.assertTrue(inActiveReaperCount.get() > 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper1);
-            CloseableUtils.closeQuietly(reaper2);
-            if (leaderLatch1 != null && LeaderLatch.State.STARTED == leaderLatch1.getState()) {
-                CloseableUtils.closeQuietly(leaderLatch1);
-            }
-            if (leaderLatch2 != null && LeaderLatch.State.STARTED == leaderLatch2.getState()) {
-                CloseableUtils.closeQuietly(leaderLatch2);
-            }
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testUsingManualLeader() throws Exception
-    {
-        final Timing timing = new Timing();
-        final CuratorFramework client = makeClient(timing, null);
-        final CountDownLatch latch = new CountDownLatch(1);
-        LeaderSelectorListener listener = new LeaderSelectorListener()
-        {
-            @Override
-            public void takeLeadership(CuratorFramework client) throws Exception
-            {
-                Reaper reaper = new Reaper(client, 1);
-                try
-                {
-                    reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
-                    reaper.start();
-
-                    timing.sleepABit();
-                    latch.countDown();
-                }
-                finally
-                {
-                    CloseableUtils.closeQuietly(reaper);
-                }
-            }
-
-            @Override
-            public void stateChanged(CuratorFramework client, ConnectionState newState)
-            {
-            }
-        };
-        LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            selector.start();
-            timing.awaitLatch(latch);
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(selector);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testSparseUseNoReap() throws Exception
-    {
-        final int THRESHOLD = 3000;
-
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, null);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
-            final ExecutorService pool = Executors.newCachedThreadPool();
-            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
-
-            reaper = new Reaper(client, service, THRESHOLD)
-            {
-                @Override
-                protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
-                {
-                    holders.add(pathHolder);
-                    final Future<?> f = super.schedule(pathHolder, reapingThresholdMs);
-                    pool.submit(new Callable<Void>()
-                        {
-                            @Override
-                            public Void call() throws Exception
-                            {
-                                f.get();
-                                holders.remove(pathHolder);
-                                return null;
-                            }
-                        }
-                               );
-                    return null;
-                }
-            };
-            reaper.start();
-            reaper.addPath("/one/two/three");
-
-            long start = System.currentTimeMillis();
-            boolean emptyCountIsCorrect = false;
-            while ( ((System.currentTimeMillis() - start) < timing.forWaiting().milliseconds()) && !emptyCountIsCorrect )   // need to loop as the Holder can go in/out of the Reaper's DelayQueue
-            {
-                for ( Reaper.PathHolder holder : holders )
-                {
-                    if ( holder.path.endsWith("/one/two/three") )
-                    {
-                        emptyCountIsCorrect = (holder.emptyCount > 0);
-                        break;
-                    }
-                }
-                Thread.sleep(1);
-            }
-            Assert.assertTrue(emptyCountIsCorrect);
-
-            client.create().forPath("/one/two/three/foo");
-
-            Thread.sleep(2 * (THRESHOLD / Reaper.EMPTY_COUNT_THRESHOLD));
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-            client.delete().forPath("/one/two/three/foo");
-
-            Thread.sleep(THRESHOLD);
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testReapUntilDelete() throws Exception
-    {
-        testReapUntilDelete(null);
-    }
-
-    @Test
-    public void testReapUntilDeleteNamespace() throws Exception
-    {
-        testReapUntilDelete("test");
-    }
-
-    @Test
-    public void testReapUntilGone() throws Exception
-    {
-        testReapUntilGone(null);
-    }
-
-    @Test
-    public void testReapUntilGoneNamespace() throws Exception
-    {
-        testReapUntilGone("test");
-    }
-
-    @Test
-    public void testRemove() throws Exception
-    {
-        testRemove(null);
-    }
-
-    @Test
-    public void testRemoveNamespace() throws Exception
-    {
-        testRemove("test");
-    }
-
-    @Test
-    public void testSimulationWithLocks() throws Exception
-    {
-        testSimulationWithLocks(null);
-    }
-
-    @Test
-    public void testSimulationWithLocksNamespace() throws Exception
-    {
-        testSimulationWithLocks("test");
-    }
-
-    @Test
-    public void testWithEphemerals() throws Exception
-    {
-        testWithEphemerals(null);
-    }
-
-    @Test
-    public void testWithEphemeralsNamespace() throws Exception
-    {
-        testWithEphemerals("test");
-    }
-
-    @Test
-    public void testBasic() throws Exception
-    {
-        testBasic(null);
-    }
-
-    @Test
-    public void testBasicNamespace() throws Exception
-    {
-        testBasic("test");
-    }
-
-    private void testReapUntilDelete(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-
-            client.create().forPath("/one/two/three");
-            timing.sleepABit();
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testReapUntilGone(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_GONE);
-            timing.sleepABit();
-
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_GONE);
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private CuratorFramework makeClient(Timing timing, String namespace) throws IOException
-    {
-        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).sessionTimeoutMs(timing.session()).connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1));
-        if ( namespace != null )
-        {
-            builder = builder.namespace(namespace);
-        }
-        return builder.build();
-    }
-
-    private void testRemove(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three");
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-
-            Assert.assertTrue(reaper.removePath("/one/two/three"));
-
-            client.create().forPath("/one/two/three");
-            timing.sleepABit();
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testSimulationWithLocks(String namespace) throws Exception
-    {
-        final int LOCK_CLIENTS = 10;
-        final int ITERATIONS = 250;
-        final int MAX_WAIT_MS = 10;
-
-        ExecutorService service = Executors.newFixedThreadPool(LOCK_CLIENTS);
-        ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
-
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        final CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-
-            reaper = new Reaper(client, MAX_WAIT_MS / 2);
-            reaper.start();
-            reaper.addPath("/a/b");
-
-            for ( int i = 0; i < LOCK_CLIENTS; ++i )
-            {
-                completionService.submit(new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
-                        {
-                            final InterProcessMutex lock = new InterProcessMutex(client, "/a/b");
-                            for ( int i = 0; i < ITERATIONS; ++i )
-                            {
-                                lock.acquire();
-                                try
-                                {
-                                    Thread.sleep((int)(Math.random() * MAX_WAIT_MS));
-                                }
-                                finally
-                                {
-                                    lock.release();
-                                }
-                            }
-                            return null;
-                        }
-                    }
-                                        );
-            }
-
-            for ( int i = 0; i < LOCK_CLIENTS; ++i )
-            {
-                completionService.take().get();
-            }
-
-            Thread.sleep(timing.session());
-            timing.sleepABit();
-
-            Stat stat = client.checkExists().forPath("/a/b");
-            Assert.assertNull(stat, "Child qty: " + ((stat != null) ? stat.getNumChildren() : 0));
-        }
-        finally
-        {
-            service.shutdownNow();
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testWithEphemerals(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client2 = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            client2 = makeClient(timing, namespace);
-            client2.start();
-            for ( int i = 0; i < 10; ++i )
-            {
-                client2.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/one/two/three/foo-");
-            }
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three");
-            timing.sleepABit();
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            client2.close();    // should clear ephemerals
-            client2 = null;
-
-            Thread.sleep(timing.session());
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client2);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testBasic(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three");
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 87585af..906282f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -31,9 +31,9 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
@@ -329,7 +329,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
             node.debugCreateNodeLatch = new CountDownLatch(1);
-            Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+            curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             // Make sure the node got deleted
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -359,7 +359,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
             node.debugCreateNodeLatch = new CountDownLatch(1);
-            Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+            curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             // Make sure the node got deleted...
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -400,7 +400,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
 
                 node.debugCreateNodeLatch = new CountDownLatch(1);
                 // Kill the session, thus cleaning up the node...
-                Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+                curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
                 // Make sure the node ended up getting deleted...
                 assertTrue(deletionTrigger.firedWithin(timing.multiple(1.5).forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -443,7 +443,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             Trigger deletedTrigger = Trigger.deletedOrSetData();
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
-            Compatibility.injectSessionExpiration(nodeCreator.getZookeeperClient().getZooKeeper());
+            nodeCreator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             // Make sure the node got deleted...
             assertTrue(deletedTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
index 360c876..b95df11 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.ZKPaths;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -38,7 +37,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestPersistentTtlNode extends CuratorTestBase
 {
     private final Timing timing = new Timing();
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
index a3c2a29..830db1f 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
+++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
@@ -21,7 +21,6 @@ package org.apache.curator.test.compatibility;
 import org.apache.curator.test.BaseClassForTests;
 import org.testng.annotations.Listeners;
 
-@Listeners(Zk35MethodInterceptor.class)
 public class CuratorTestBase extends BaseClassForTests
 {
     protected final Timing2 timing = new Timing2();
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
deleted file mode 100644
index 8072b68..0000000
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.test.compatibility;
-
-import org.apache.curator.test.Compatibility;
-import org.testng.IMethodInstance;
-import org.testng.IMethodInterceptor;
-import org.testng.ITestContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class Zk35MethodInterceptor implements IMethodInterceptor
-{
-    public static final String zk35Group = "zk35";
-
-    @Override
-    public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context)
-    {
-        if ( !Compatibility.isZK34() )
-        {
-            return methods;
-        }
-
-        List<IMethodInstance> filteredMethods = new ArrayList<>();
-        for ( IMethodInstance method : methods )
-        {
-            if ( !isInGroup(method.getMethod().getGroups()) )
-            {
-                filteredMethods.add(method);
-            }
-        }
-        return filteredMethods;
-    }
-
-    private boolean isInGroup(String[] groups)
-    {
-        return (groups != null) && Arrays.asList(groups).contains(zk35Group);
-    }
-}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..5c015f4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -20,7 +20,7 @@ package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
@@ -28,10 +28,10 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
@@ -45,7 +45,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
     private final TreeCache cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
-    private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
+    private final StandardListenerManager<ModeledCacheListener<T>> listenerContainer = StandardListenerManager.standard();
     private final ZPath basePath;
 
     private static final class Entry<T>
@@ -144,10 +144,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         {
             ThreadUtils.checkInterrupted(e);
 
-            listenerContainer.forEach(l -> {
-                l.handleException(e);
-                return null;
-            });
+            listenerContainer.forEach(l -> l.handleException(e));
         }
     }
 
@@ -188,10 +185,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         case INITIALIZED:
         {
-            listenerContainer.forEach(l -> {
-                l.initialized();
-                return null;
-            });
+            listenerContainer.forEach(ModeledCacheListener::initialized);
             break;
         }
 
@@ -203,9 +197,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     private void accept(ModeledCacheListener.Type type, ZPath path, Stat stat, T model)
     {
-        listenerContainer.forEach(l -> {
-            l.accept(type, path, stat, model);
-            return null;
-        });
+        listenerContainer.forEach(l -> l.accept(type, path, stat, model));
     }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 05df301..c154836 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,14 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +46,15 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
+    private final StandardListenerManager<ServiceCacheListener> listenerContainer = StandardListenerManager.standard();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final PathChildrenCache cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -123,18 +122,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(l -> discovery.getClient().getConnectionStateListenable().removeListener(l));
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -166,39 +154,28 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
-            {
-                addInstance(event.getData(), false);
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            addInstance(event.getData(), false);
+            notifyListeners = true;
+            break;
+        }
 
-            case CHILD_REMOVED:
-            {
-                instances.remove(instanceIdFromData(event.getData()));
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_REMOVED:
+        {
+            instances.remove(instanceIdFromData(event.getData()));
+            notifyListeners = true;
+            break;
+        }
         }
 
         if ( notifyListeners )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
-                }
-            );
+            listenerContainer.forEach(ServiceCacheListener::cacheChanged);
         }
     }
 
@@ -209,8 +186,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
 
     private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+        String instanceId = instanceIdFromData(childData);
+        ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
         if ( onlyIfAbsent )
         {
             instances.putIfAbsent(instanceId, serviceInstance);
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 54719a5..7eecf3c 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -25,9 +25,9 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -79,7 +79,7 @@ public class TestServiceDiscovery extends BaseClassForTests
             timing.acquireSemaphore(semaphore, 2);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             server.stop();
 
             server.restart();
@@ -121,7 +121,7 @@ public class TestServiceDiscovery extends BaseClassForTests
             timing.acquireSemaphore(semaphore);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             server.stop();
 
             server.restart();
@@ -154,7 +154,7 @@ public class TestServiceDiscovery extends BaseClassForTests
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Thread.sleep(timing.multiple(1.5).session());
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
diff --git a/pom.xml b/pom.xml
index 43f2d561..b95a2bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,7 +317,6 @@
         <module>curator-x-discovery</module>
         <module>curator-x-discovery-server</module>
         <module>curator-x-async</module>
-        <module>curator-test-zk34</module>
     </modules>
 
     <dependencyManagement>
@@ -873,11 +872,6 @@
                                 <relocation>
                                     <pattern>com.google</pattern>
                                     <shadedPattern>org.apache.curator.shaded.com.google</shadedPattern>
-                                    <excludes>
-                                        <exclude>com.google.common.base.Function</exclude>
-                                        <exclude>com.google.common.base.Predicate</exclude>
-                                        <exclude>com.google.common.reflect.TypeToken</exclude>
-                                    </excludes>
                                 </relocation>
                             </relocations>
                             <artifactSet>
diff --git a/src/site/confluence/exhibitor.confluence b/src/site/confluence/exhibitor.confluence
deleted file mode 100644
index 156ff50..0000000
--- a/src/site/confluence/exhibitor.confluence
+++ /dev/null
@@ -1,24 +0,0 @@
-h1. Exhibitor Integration
-
-Curator can be integrated with Netflix [[Exhibitor|https://github.com/Netflix/exhibitor]] to achieve a live/updating list
-of the ZooKeeper ensemble. This means that your ZooKeeper client (i.e. Curator) will automatically/dynamically adjust
-to changes in the makeup of a ZooKeeper ensemble. Further, this support is generalized so that a service other than Exhibitor could be used if available.
-
-h2. Enabling
-
-The integration is enabled via the {{CuratorFrameworkFactory}} (or when constructing the {{CuratorZookeeperClient}} if not
-using the framework). Pass an {{EnsembleProvider}} to the {{ensembleProvider()}} method of the {{CuratorFrameworkFactory}} builder.
-
-h2. EnsembleProvider
-
-For Exhibitor, construct an instance of {{ExhibitorEnsembleProvider}} to pass to the builder. It takes a number of arguments:
-* exhibitors \- a list of the exhibitor instances. This is the initial set of Exhibitor instances. This set will get automatically updated if it changes.
-* restClient \- a simple facade to access the Exhibitor instances via REST. Use the provided {{DefaultExhibitorRestClient}} or something else of your choosing.
-* restUriPath \- the REST path to use. In most cases this should be: {{/exhibitor/v1/cluster/list}}
-* pollingMs \- how often to poll the Exhibitor instances
-* retryPolicy \- the retry policy to use when polling the Exhibitor instances.
-
-h2. Details
-
-Once configured, Curator will poll the Exhibitors for changes in the ensemble. If Curator should need to re\-create the ZooKeeper
-instance (due to a SysDisconnected event, etc.) it will use the updated ensemble list to do so.
diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility-34.confluence
similarity index 68%
rename from src/site/confluence/zk-compatibility.confluence
rename to src/site/confluence/zk-compatibility-34.confluence
index ed6e32e..911a642 100644
--- a/src/site/confluence/zk-compatibility.confluence
+++ b/src/site/confluence/zk-compatibility-34.confluence
@@ -1,18 +1,8 @@
-h1. ZooKeeper Version Compatibility
+h1. ZooKeeper Version 3.4.x Compatibility
 
-While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is
-used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator
-4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0
-both versions of ZooKeeper are supported via the same Curator libraries.
-
-h2. ZooKeeper 3.5.x
-
-* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x
-* If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0
-
-h2. ZooKeeper 3.4.x
-
-Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
+ZooKeeper 3.4.x is now at end-of-life. Consequently, the latest versions of Curator have removed support
+for it. If you wish to use Curator with ZooKeeper 3.4.x you should pin to version 4.2.x of Curator.
+Curator 4.2.x supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
 you must exclude ZooKeeper when adding Curator to your dependency management tool.
 
 _Maven_
@@ -21,7 +11,7 @@ _Maven_
 <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
-    <version>${curator-version}</version>
+    <version>4.2.0</version>
     <exclusions>
         <exclusion>
             <groupId>org.apache.zookeeper</groupId>
diff --git a/src/site/site.xml b/src/site/site.xml
index 3ff625f..bdfbce5 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -81,10 +81,6 @@
             <item name="Schema Support" href="curator-framework/schema.html"/>
         </menu>
 
-        <menu name="Compatibility" inherit="top">
-            <item name="ZooKeeper Versions" href="zk-compatibility.html"/>
-        </menu>
-
         <menu name="Low Level" inherit="top">
             <item name="Framework" href="curator-framework/index.html"/>
             <item name="Utilities" href="utilities.html"/>
@@ -101,7 +97,8 @@
             <item name="API Compatibility" href="compatibility.html"/>
             <item name="Javadoc" href="apidocs/index.html"/>
             <item name="Wiki" href="https://cwiki.apache.org/confluence/display/CURATOR"/>
-            <item name="Releases" href="https://cwiki.apache.org/confluence/display/CURATOR/Releases"/>
+            <item name="Releases/Compatibility" href="https://cwiki.apache.org/confluence/display/CURATOR/Releases"/>
+            <item name="ZooKeeper 3.4.x" href="zk-compatibility-34.html"/>
         </menu>
 
         <menu name="Extensions" inherit="top">


[curator] 02/02: CURATOR-558 - uses CuratorTestBase so that retries occur and a short session timeout for faster execution

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-558-pt1-remove-zk40-etc
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 45a66651737e937e0702ce5559c5c21a39d7d8d8
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Feb 2 15:31:15 2020 -0500

    CURATOR-558 - uses CuratorTestBase so that retries occur and a short session timeout for faster execution
---
 .../recipes/leader/TestLeaderLatchCluster.java     | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatchCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatchCluster.java
index 8744bd2..0d08199 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatchCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatchCluster.java
@@ -19,20 +19,20 @@
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
-import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
 import java.util.List;
 
-public class TestLeaderLatchCluster
+public class TestLeaderLatchCluster extends CuratorTestBase
 {
     private static final int MAX_LOOPS = 5;
 
@@ -54,9 +54,9 @@ public class TestLeaderLatchCluster
     public void testInCluster() throws Exception
     {
         final int PARTICIPANT_QTY = 3;
+        final int sessionLength = timing.session() / 4;
 
         List<ClientAndLatch>    clients = Lists.newArrayList();
-        Timing                  timing = new Timing();
         TestingCluster          cluster = new TestingCluster(PARTICIPANT_QTY);
         try
         {
@@ -65,7 +65,7 @@ public class TestLeaderLatchCluster
             List<InstanceSpec>      instances = Lists.newArrayList(cluster.getInstances());
             for ( int i = 0; i < PARTICIPANT_QTY; ++i )
             {
-                CuratorFramework        client = CuratorFrameworkFactory.newClient(instances.get(i).getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
+                CuratorFramework        client = CuratorFrameworkFactory.newClient(instances.get(i).getConnectString(), sessionLength, sessionLength, new ExponentialBackoffRetry(100, 3));
                 LeaderLatch             latch = new LeaderLatch(client, "/latch");
 
                 clients.add(new ClientAndLatch(client, latch, i));
@@ -78,7 +78,7 @@ public class TestLeaderLatchCluster
 
             cluster.killServer(instances.get(leader.index));
 
-            Thread.sleep(timing.multiple(2).session());
+            Thread.sleep(sessionLength * 2);
 
             leader = waitForALeader(clients, timing);
             Assert.assertNotNull(leader);
@@ -96,7 +96,13 @@ public class TestLeaderLatchCluster
         }
     }
 
-    private ClientAndLatch waitForALeader(List<ClientAndLatch> latches, Timing timing) throws InterruptedException
+    @Override
+    protected void createServer()
+    {
+        // NOP
+    }
+
+    private ClientAndLatch waitForALeader(List<ClientAndLatch> latches, Timing2 timing) throws InterruptedException
     {
         for ( int i = 0; i < MAX_LOOPS; ++i )
         {