You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/02/12 22:10:18 UTC

[curator] 01/02: CURATOR-558

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-558-pt1-remove-zk40-etc
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 5a04ba4c0861a60192e1cd3e1d10875c4b03ea72
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Feb 2 11:34:34 2020 -0500

    CURATOR-558
    
    Pt1 of change
    
    * Remove the ZK 3.4 compatibility module and code
    * Remove the deprecated ListenerContainer that leaks Guava classes into our APIs
    * Remove Exhibitor support
    * Various minor changes/cleanups
---
 .../exhibitor/DefaultExhibitorRestClient.java      |  68 ---
 .../exhibitor/ExhibitorEnsembleProvider.java       | 335 -----------
 .../ensemble/exhibitor/ExhibitorRestClient.java    |  34 --
 .../curator/ensemble/exhibitor/Exhibitors.java     |  66 ---
 .../org/apache/curator/utils/Compatibility.java    | 101 ----
 .../curator/utils/InjectSessionExpiration.java     |  77 ---
 .../test/java/org/apache/curator/BasicTests.java   |   4 +-
 .../apache/curator/TestSessionFailRetryLoop.java   |  10 +-
 .../exhibitor/TestExhibitorEnsembleProvider.java   | 227 --------
 .../apache/curator/framework/CuratorFramework.java |   7 -
 .../curator/framework/CuratorFrameworkFactory.java |  22 -
 .../apache/curator/framework/SafeIsTtlMode.java    |  44 --
 .../framework/imps/CompatibleCreateCallback.java   |  26 -
 .../curator/framework/imps/CreateBuilderImpl.java  |  78 +--
 .../apache/curator/framework/imps/CreateZK35.java  |  47 --
 .../framework/imps/CuratorFrameworkImpl.java       |  83 +--
 .../imps/CuratorMultiTransactionImpl.java          |  18 +-
 .../framework/imps/WatcherRemovalManager.java      |   7 +-
 .../framework/listen/ListenerContainer.java        | 112 ----
 .../framework/listen/MappingListenerManager.java   |   2 +-
 .../framework/state/ConnectionStateManager.java    |   3 +-
 .../curator/framework/imps/TestCleanState.java     |   7 -
 .../framework/imps/TestCreateReturningStat.java    |   2 -
 .../imps/TestEnabledSessionExpiredState.java       |   3 +-
 .../curator/framework/imps/TestFramework.java      |   2 -
 .../curator/framework/imps/TestFrameworkEdges.java |  13 +-
 .../framework/imps/TestReconfiguration.java        |   2 -
 .../curator/framework/imps/TestRemoveWatches.java  |   2 -
 .../curator/framework/imps/TestTtlNodes.java       |   2 -
 .../framework/imps/TestWatcherRemovalManager.java  |   2 -
 .../curator/framework/recipes/cache/NodeCache.java |  35 +-
 .../framework/recipes/cache/PathChildrenCache.java |  39 +-
 .../curator/framework/recipes/cache/TreeCache.java |  49 +-
 .../framework/recipes/leader/LeaderLatch.java      |  25 +-
 .../framework/recipes/locks/ChildReaper.java       | 307 ----------
 .../curator/framework/recipes/locks/Reaper.java    | 380 ------------
 .../framework/recipes/nodes/PersistentNode.java    |  41 +-
 .../recipes/queue/DistributedDelayQueue.java       |   4 +-
 .../recipes/queue/DistributedIdQueue.java          |   4 +-
 .../recipes/queue/DistributedPriorityQueue.java    |   4 +-
 .../framework/recipes/queue/DistributedQueue.java  |  63 +-
 .../curator/framework/recipes/queue/QueueBase.java |   4 +-
 .../framework/recipes/shared/SharedValue.java      |  52 +-
 .../recipes/shared/SharedValueReader.java          |   4 +-
 .../framework/recipes/cache/TestNodeCache.java     |   4 +-
 .../recipes/cache/TestPathChildrenCache.java       |   4 +-
 .../framework/recipes/cache/TestTreeCache.java     |   6 +-
 .../recipes/leader/TestLeaderSelector.java         |   4 +-
 .../framework/recipes/locks/TestChildReaper.java   | 351 -----------
 .../recipes/locks/TestInterProcessMutex.java       |   4 +-
 .../recipes/locks/TestInterProcessMutexBase.java   |   4 +-
 .../recipes/locks/TestInterProcessSemaphore.java   |  47 --
 .../framework/recipes/locks/TestReaper.java        | 647 ---------------------
 .../recipes/nodes/TestPersistentEphemeralNode.java |  10 +-
 .../recipes/nodes/TestPersistentTtlNode.java       |   2 -
 .../test/compatibility/CuratorTestBase.java        |   1 -
 .../test/compatibility/Zk35MethodInterceptor.java  |  56 --
 .../x/async/modeled/details/ModeledCacheImpl.java  |  21 +-
 .../x/discovery/details/ServiceCacheImpl.java      |  79 +--
 .../x/discovery/details/TestServiceDiscovery.java  |   8 +-
 pom.xml                                            |   6 -
 src/site/confluence/exhibitor.confluence           |  24 -
 ...y.confluence => zk-compatibility-34.confluence} |  20 +-
 src/site/site.xml                                  |   7 +-
 64 files changed, 237 insertions(+), 3485 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java
deleted file mode 100644
index 01b9525..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/DefaultExhibitorRestClient.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import org.apache.curator.utils.CloseableUtils;
-import java.io.BufferedInputStream;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URI;
-
-@SuppressWarnings("UnusedDeclaration")
-public class DefaultExhibitorRestClient implements ExhibitorRestClient
-{
-    private final boolean useSsl;
-
-    public DefaultExhibitorRestClient()
-    {
-        this(false);
-    }
-
-    public DefaultExhibitorRestClient(boolean useSsl)
-    {
-        this.useSsl = useSsl;
-    }
-
-    @Override
-    public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-    {
-        URI                 uri = new URI(useSsl ? "https" : "http", null, hostname, port, uriPath, null, null);
-        HttpURLConnection   connection = (HttpURLConnection)uri.toURL().openConnection();
-        connection.addRequestProperty("Accept", mimeType);
-        StringBuilder       str = new StringBuilder();
-        InputStream         in = new BufferedInputStream(connection.getInputStream());
-        try
-        {
-            for(;;)
-            {
-                int     b = in.read();
-                if ( b < 0 )
-                {
-                    break;
-                }
-                str.append((char)(b & 0xff));
-            }
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(in);
-        }
-        return str.toString();
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
deleted file mode 100644
index eb4f8bd..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.ensemble.EnsembleProvider;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Ensemble provider that polls a cluster of Exhibitor (https://github.com/Netflix/exhibitor)
- * instances for the connection string.
- * If the set of instances should change, new ZooKeeper connections will use the new connection
- * string.
- */
-public class ExhibitorEnsembleProvider implements EnsembleProvider
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final AtomicReference<Exhibitors> exhibitors = new AtomicReference<Exhibitors>();
-    private final AtomicReference<Exhibitors> masterExhibitors = new AtomicReference<Exhibitors>();
-    private final ExhibitorRestClient restClient;
-    private final String restUriPath;
-    private final int pollingMs;
-    private final RetryPolicy retryPolicy;
-    private final ScheduledExecutorService service = ThreadUtils.newSingleThreadScheduledExecutor("ExhibitorEnsembleProvider");
-    private final Random random = new Random();
-    private final AtomicReference<String> connectionString = new AtomicReference<String>("");
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-
-    private static final String MIME_TYPE = "application/x-www-form-urlencoded";
-
-    private static final String VALUE_PORT = "port";
-    private static final String VALUE_COUNT = "count";
-    private static final String VALUE_SERVER_PREFIX = "server";
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    /**
-     * @param exhibitors the current set of exhibitor instances (can be changed later via {@link #setExhibitors(Exhibitors)})
-     * @param restClient the rest client to use (use {@link DefaultExhibitorRestClient} for most cases)
-     * @param restUriPath the path of the REST call used to get the server set. Usually: <code>/exhibitor/v1/cluster/list</code>
-     * @param pollingMs how ofter to poll the exhibitors for the list
-     * @param retryPolicy retry policy to use when connecting to the exhibitors
-     */
-    public ExhibitorEnsembleProvider(Exhibitors exhibitors, ExhibitorRestClient restClient, String restUriPath, int pollingMs, RetryPolicy retryPolicy)
-    {
-        this.exhibitors.set(exhibitors);
-        this.masterExhibitors.set(exhibitors);
-        this.restClient = restClient;
-        this.restUriPath = restUriPath;
-        this.pollingMs = pollingMs;
-        this.retryPolicy = retryPolicy;
-    }
-
-    /**
-     * Change the set of exhibitors to poll
-     *
-     * @param newExhibitors new set
-     */
-    public void     setExhibitors(Exhibitors newExhibitors)
-    {
-        exhibitors.set(newExhibitors);
-        masterExhibitors.set(newExhibitors);
-    }
-
-    /**
-     * Can be called prior to starting the Curator instance to set the current connection string
-     *
-     * @throws Exception errors
-     */
-    public void     pollForInitialEnsemble() throws Exception
-    {
-        Preconditions.checkState(state.get() == State.LATENT, "Cannot be called after start()");
-        poll();
-    }
-
-    @Override
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
-        service.scheduleWithFixedDelay
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        poll();
-                    }
-                },
-                pollingMs,
-                pollingMs,
-                TimeUnit.MILLISECONDS
-            );
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
-
-        service.shutdownNow();
-    }
-
-    @Override
-    public String getConnectionString()
-    {
-        return connectionString.get();
-    }
-
-    @Override
-    public void setConnectionString(String connectionString)
-    {
-        log.info("setConnectionString received. Ignoring. " + connectionString);
-    }
-
-    @Override
-    public boolean updateServerListEnabled()
-    {
-        return false;
-    }
-
-    @VisibleForTesting
-    protected void poll()
-    {
-        Exhibitors              localExhibitors = exhibitors.get();
-        Map<String, String>     values = queryExhibitors(localExhibitors);
-
-        int                     count = getCountFromValues(values);
-        if ( count == 0 )
-        {
-            log.warn("0 count returned from Exhibitors. Using backup connection values.");
-            values = useBackup(localExhibitors);
-            count = getCountFromValues(values);
-        }
-
-        if ( count > 0 )
-        {
-            int                     port = Integer.parseInt(values.get(VALUE_PORT));
-            StringBuilder           newConnectionString = new StringBuilder();
-            List<String>            newHostnames = Lists.newArrayList();
-
-            for ( int i = 0; i < count; ++i )
-            {
-                if ( newConnectionString.length() > 0 )
-                {
-                    newConnectionString.append(",");
-                }
-                String      server = values.get(VALUE_SERVER_PREFIX + i);
-                newConnectionString.append(server).append(":").append(port);
-                newHostnames.add(server);
-            }
-
-            String newConnectionStringValue = newConnectionString.toString();
-            if ( !newConnectionStringValue.equals(connectionString.get()) )
-            {
-                log.info(String.format("Connection string has changed. Old value (%s), new value (%s)", connectionString.get(), newConnectionStringValue));
-            }
-            Exhibitors newExhibitors = new Exhibitors
-            (
-                newHostnames,
-                localExhibitors.getRestPort(),
-                new Exhibitors.BackupConnectionStringProvider()
-                {
-                    @Override
-                    public String getBackupConnectionString() throws Exception
-                    {
-                        return masterExhibitors.get().getBackupConnectionString();  // this may be overloaded by clients. Make sure there is always a method call to get the value.
-                    }
-                }
-            );
-            connectionString.set(newConnectionStringValue);
-            exhibitors.set(newExhibitors);
-        }
-    }
-
-    private int getCountFromValues(Map<String, String> values)
-    {
-        try
-        {
-            return Integer.parseInt(values.get(VALUE_COUNT));
-        }
-        catch ( NumberFormatException e )
-        {
-            // ignore
-        }
-        return 0;
-    }
-
-    private Map<String, String> useBackup(Exhibitors localExhibitors)
-    {
-        Map<String, String>     values = newValues();
-
-        try
-        {
-            String      backupConnectionString = localExhibitors.getBackupConnectionString();
-
-            int         thePort = -1;
-            int         count = 0;
-            for ( String spec : backupConnectionString.split(",") )
-            {
-                spec = spec.trim();
-                String[]        parts = spec.split(":");
-                if ( parts.length == 2 )
-                {
-                    String      hostname = parts[0];
-                    int         port = Integer.parseInt(parts[1]);
-                    if ( thePort < 0 )
-                    {
-                        thePort = port;
-                    }
-                    else if ( port != thePort )
-                    {
-                        log.warn("Inconsistent port in connection component: " + spec);
-                    }
-                    values.put(VALUE_SERVER_PREFIX + count, hostname);
-                    ++count;
-                }
-                else
-                {
-                    log.warn("Bad backup connection component: " + spec);
-                }
-            }
-            values.put(VALUE_COUNT, Integer.toString(count));
-            values.put(VALUE_PORT, Integer.toString(thePort));
-        }
-        catch ( Exception e )
-        {
-            ThreadUtils.checkInterrupted(e);
-            log.error("Couldn't get backup connection string", e);
-        }
-        return values;
-    }
-
-    private Map<String, String> newValues()
-    {
-        Map<String, String>     values = Maps.newHashMap();
-        values.put(VALUE_COUNT, "0");
-        return values;
-    }
-
-    private static Map<String, String> decodeExhibitorList(String str) throws UnsupportedEncodingException
-    {
-        Map<String, String>     values = Maps.newHashMap();
-        for ( String spec : str.split("&") )
-        {
-            String[]        parts = spec.split("=");
-            if ( parts.length == 2 )
-            {
-                values.put(parts[0], URLDecoder.decode(parts[1], "UTF-8"));
-            }
-        }
-
-        return values;
-    }
-
-    private Map<String, String> queryExhibitors(Exhibitors localExhibitors)
-    {
-        Map<String, String> values = newValues();
-
-        long                    start = System.currentTimeMillis();
-        int                     retries = 0;
-        boolean                 done = false;
-        while ( !done )
-        {
-            List<String> hostnames = Lists.newArrayList(localExhibitors.getHostnames());
-            if ( hostnames.size() == 0 )
-            {
-                done = true;
-            }
-            else
-            {
-                String          hostname = hostnames.get(random.nextInt(hostnames.size()));
-                try
-                {
-                    String                  encoded = restClient.getRaw(hostname, localExhibitors.getRestPort(), restUriPath, MIME_TYPE);
-                    values.putAll(decodeExhibitorList(encoded));
-                    done = true;
-                }
-                catch ( Throwable e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    if ( retryPolicy.allowRetry(retries++, System.currentTimeMillis() - start, RetryLoop.getDefaultRetrySleeper()) )
-                    {
-                        log.warn("Couldn't get servers from Exhibitor. Retrying.", e);
-                    }
-                    else
-                    {
-                        log.error("Couldn't get servers from Exhibitor. Giving up.", e);
-                        done = true;
-                    }
-                }
-            }
-        }
-
-        return values;
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java
deleted file mode 100644
index 007f652..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorRestClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-public interface ExhibitorRestClient
-{
-    /**
-     * Connect to the given Exhibitor and return the raw result
-     *
-     * @param hostname host to connect to
-     * @param port connect port
-     * @param uriPath path
-     * @param mimeType Accept mime type
-     * @return raw result
-     * @throws Exception errors
-     */
-    public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception;
-}
diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java
deleted file mode 100644
index 24e0c14..0000000
--- a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/Exhibitors.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-
-/**
- * POJO for specifying the cluster of Exhibitor instances
- */
-public class Exhibitors
-{
-    private final Collection<String> hostnames;
-    private final int restPort;
-    private final BackupConnectionStringProvider backupConnectionStringProvider;
-
-    public interface BackupConnectionStringProvider
-    {
-        public String getBackupConnectionString() throws Exception;
-    }
-
-    /**
-     * @param hostnames set of Exhibitor instance host names
-     * @param restPort the REST port used to connect to Exhibitor
-     * @param backupConnectionStringProvider in case an Exhibitor instance can't be contacted, returns the fixed
-     *                               connection string to use as a backup
-     */
-    public Exhibitors(Collection<String> hostnames, int restPort, BackupConnectionStringProvider backupConnectionStringProvider)
-    {
-        this.backupConnectionStringProvider = Preconditions.checkNotNull(backupConnectionStringProvider, "backupConnectionStringProvider cannot be null");
-        this.hostnames = ImmutableList.copyOf(hostnames);
-        this.restPort = restPort;
-    }
-
-    public Collection<String> getHostnames()
-    {
-        return hostnames;
-    }
-
-    public int getRestPort()
-    {
-        return restPort;
-    }
-
-    public String getBackupConnectionString() throws Exception
-    {
-        return backupConnectionStringProvider.getBackupConnectionString();
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
deleted file mode 100644
index 1ee2301..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.utils;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.reflect.Method;
-
-/**
- * Utils to help with ZK 3.4.x compatibility
- */
-public class Compatibility
-{
-    private static final boolean hasZooKeeperAdmin;
-    private static final Method queueEventMethod;
-    private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
-
-    static
-    {
-        boolean localHasZooKeeperAdmin;
-        try
-        {
-            Class.forName("org.apache.zookeeper.admin.ZooKeeperAdmin");
-            localHasZooKeeperAdmin = true;
-        }
-        catch ( ClassNotFoundException e )
-        {
-            localHasZooKeeperAdmin = false;
-            logger.info("Running in ZooKeeper 3.4.x compatibility mode");
-        }
-        hasZooKeeperAdmin = localHasZooKeeperAdmin;
-
-        Method localQueueEventMethod;
-        try
-        {
-            Class<?> testableClass = Class.forName("org.apache.zookeeper.Testable");
-            localQueueEventMethod = testableClass.getMethod("queueEvent", WatchedEvent.class);
-        }
-        catch ( ReflectiveOperationException ignore )
-        {
-            localQueueEventMethod = null;
-            LoggerFactory.getLogger(Compatibility.class).info("Using emulated InjectSessionExpiration");
-        }
-        queueEventMethod = localQueueEventMethod;
-    }
-
-    /**
-     * Return true if the classpath ZooKeeper library is 3.4.x
-     *
-     * @return true/false
-     */
-    public static boolean isZK34()
-    {
-        return !hasZooKeeperAdmin;
-    }
-
-    /**
-     * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>.
-     * For ZooKeeper 3.4.x do the equivalent via reflection
-     *
-     * @param zooKeeper client
-     */
-    public static void injectSessionExpiration(ZooKeeper zooKeeper)
-    {
-        if ( isZK34() || (queueEventMethod == null) )
-        {
-            InjectSessionExpiration.injectSessionExpiration(zooKeeper);
-        }
-        else
-        {
-            try
-            {
-                WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
-                queueEventMethod.invoke(zooKeeper.getTestable(), event);
-            }
-            catch ( Exception e )
-            {
-                logger.error("Could not call Testable.queueEvent()", e);
-            }
-        }
-    }
-}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java b/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
deleted file mode 100644
index 8ad2b5d..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.utils;
-
-import org.apache.zookeeper.ClientCnxn;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-
-// reflective version of zooKeeper.getTestable().injectSessionExpiration();
-@SuppressWarnings("JavaReflectionMemberAccess")
-public class InjectSessionExpiration
-{
-    private static final Field cnxnField;
-    private static final Field eventThreadField;
-    private static final Method queueEventMethod;
-    static
-    {
-        Field localCnxnField;
-        Field localEventThreadField;
-        Method localQueueEventMethod;
-        try
-        {
-            Class<?> eventThreadClass = Class.forName("org.apache.zookeeper.ClientCnxn$EventThread");
-
-            localCnxnField = ZooKeeper.class.getDeclaredField("cnxn");
-            localCnxnField.setAccessible(true);
-            localEventThreadField = ClientCnxn.class.getDeclaredField("eventThread");
-            localEventThreadField.setAccessible(true);
-            localQueueEventMethod = eventThreadClass.getDeclaredMethod("queueEvent", WatchedEvent.class);
-            localQueueEventMethod.setAccessible(true);
-        }
-        catch ( ReflectiveOperationException e )
-        {
-            throw new RuntimeException("Could not access internal ZooKeeper fields", e);
-        }
-        cnxnField = localCnxnField;
-        eventThreadField = localEventThreadField;
-        queueEventMethod = localQueueEventMethod;
-    }
-
-    public static void injectSessionExpiration(ZooKeeper zooKeeper)
-    {
-        try
-        {
-            WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
-
-            ClientCnxn clientCnxn = (ClientCnxn)cnxnField.get(zooKeeper);
-            Object eventThread = eventThreadField.get(clientCnxn);
-            queueEventMethod.invoke(eventThread, event);
-
-            // we used to set the state field to CLOSED here and a few other things but this resulted in CURATOR-498
-        }
-        catch ( ReflectiveOperationException e )
-        {
-            throw new RuntimeException("Could not inject session expiration using reflection", e);
-        }
-    }
-}
diff --git a/curator-client/src/test/java/org/apache/curator/BasicTests.java b/curator-client/src/test/java/org/apache/curator/BasicTests.java
index f951fb5..96b1ad0 100644
--- a/curator-client/src/test/java/org/apache/curator/BasicTests.java
+++ b/curator-client/src/test/java/org/apache/curator/BasicTests.java
@@ -21,8 +21,8 @@ package org.apache.curator;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -100,7 +100,7 @@ public class BasicTests extends BaseClassForTests
                                 // ignore
                             }
 
-                            Compatibility.injectSessionExpiration(client.getZooKeeper());
+                            client.getZooKeeper().getTestable().injectSessionExpiration();
 
                             Assert.assertTrue(timing.awaitLatch(latch));
                         }
diff --git a/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java b/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
index 7c9c963..e661930 100644
--- a/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
+++ b/curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
@@ -20,9 +20,9 @@ package org.apache.curator;
 
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.Callable;
@@ -57,7 +57,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                 if ( firstTime.compareAndSet(true, false) )
                                 {
                                     Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                    Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                    client.getZooKeeper().getTestable().injectSessionExpiration();
                                     client.getZooKeeper();
                                     client.blockUntilConnectedOrTimedOut();
                                 }
@@ -131,7 +131,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                     if ( firstTime.compareAndSet(true, false) )
                                     {
                                         Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                        Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                        client.getZooKeeper().getTestable().injectSessionExpiration();
                                         client.getZooKeeper();
                                         client.blockUntilConnectedOrTimedOut();
                                     }
@@ -196,7 +196,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                 public Void call() throws Exception
                                 {
                                     Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                    Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                    client.getZooKeeper().getTestable().injectSessionExpiration();
 
                                     timing.sleepABit();
 
@@ -258,7 +258,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                     public Void call() throws Exception
                                     {
                                         Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                        Compatibility.injectSessionExpiration(client.getZooKeeper());
+                                        client.getZooKeeper().getTestable().injectSessionExpiration();
 
                                         client.getZooKeeper();
                                         client.blockUntilConnectedOrTimedOut();
diff --git a/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java b/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java
deleted file mode 100644
index 60c817a..0000000
--- a/curator-client/src/test/java/org/apache/curator/ensemble/exhibitor/TestExhibitorEnsembleProvider.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.exhibitor;
-
-import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class TestExhibitorEnsembleProvider extends BaseClassForTests
-{
-    private static final Exhibitors.BackupConnectionStringProvider dummyConnectionStringProvider = new Exhibitors.BackupConnectionStringProvider()
-    {
-        @Override
-        public String getBackupConnectionString() throws Exception
-        {
-            return null;
-        }
-    };
-
-    @Test
-    public void     testExhibitorFailures() throws Exception
-    {
-        final AtomicReference<String>   backupConnectionString = new AtomicReference<String>("backup1:1");
-        final AtomicReference<String>   connectionString = new AtomicReference<String>("count=1&port=2&server0=localhost");
-        Exhibitors                      exhibitors = new Exhibitors
-        (
-            Lists.newArrayList("foo", "bar"),
-            1000,
-            new Exhibitors.BackupConnectionStringProvider()
-            {
-                @Override
-                public String getBackupConnectionString()
-                {
-                    return backupConnectionString.get();
-                }
-            }
-        );
-        ExhibitorRestClient             mockRestClient = new ExhibitorRestClient()
-        {
-            @Override
-            public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-            {
-                String localConnectionString = connectionString.get();
-                if ( localConnectionString == null )
-                {
-                    throw new IOException();
-                }
-                return localConnectionString;
-            }
-        };
-
-        final Semaphore             semaphore = new Semaphore(0);
-        ExhibitorEnsembleProvider   provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1))
-        {
-            @Override
-            protected void poll()
-            {
-                super.poll();
-                semaphore.release();
-            }
-        };
-        provider.pollForInitialEnsemble();
-        try
-        {
-            provider.start();
-
-            Assert.assertEquals(provider.getConnectionString(), "localhost:2");
-
-            connectionString.set(null);
-            semaphore.drainPermits();
-            semaphore.acquire();    // wait for next poll
-            Assert.assertEquals(provider.getConnectionString(), "backup1:1");
-
-            backupConnectionString.set("backup2:2");
-            semaphore.drainPermits();
-            semaphore.acquire();    // wait for next poll
-            Assert.assertEquals(provider.getConnectionString(), "backup2:2");
-
-            connectionString.set("count=1&port=3&server0=localhost3");
-            semaphore.drainPermits();
-            semaphore.acquire();    // wait for next poll
-            Assert.assertEquals(provider.getConnectionString(), "localhost3:3");
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(provider);
-        }
-    }
-
-    @Test
-    public void     testChanging() throws Exception
-    {
-        TestingServer               secondServer = new TestingServer();
-        try
-        {
-            String                          mainConnectionString = "count=1&port=" + server.getPort() + "&server0=localhost";
-            String                          secondConnectionString = "count=1&port=" + secondServer.getPort() + "&server0=localhost";
-
-            final Semaphore                 semaphore = new Semaphore(0);
-            final AtomicReference<String>   connectionString = new AtomicReference<String>(mainConnectionString);
-            Exhibitors                      exhibitors = new Exhibitors(Lists.newArrayList("foo", "bar"), 1000, dummyConnectionStringProvider);
-            ExhibitorRestClient             mockRestClient = new ExhibitorRestClient()
-            {
-                @Override
-                public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-                {
-                    semaphore.release();
-                    return connectionString.get();
-                }
-            };
-            ExhibitorEnsembleProvider   provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1));
-            provider.pollForInitialEnsemble();
-
-            Timing                          timing = new Timing().multiple(4);
-            final CuratorZookeeperClient    client = new CuratorZookeeperClient(provider, timing.session(), timing.connection(), null, new RetryOneTime(2));
-            client.start();
-            try
-            {
-                RetryLoop.callWithRetry
-                (
-                    client,
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
-                        {
-                            client.getZooKeeper().create("/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                            return null;
-                        }
-                    }
-                );
-
-                connectionString.set(secondConnectionString);
-                semaphore.drainPermits();
-                semaphore.acquire();
-
-                server.stop();  // create situation where the current zookeeper gets a sys-disconnected
-
-                Stat        stat = RetryLoop.callWithRetry
-                (
-                    client,
-                    new Callable<Stat>()
-                    {
-                        @Override
-                        public Stat call() throws Exception
-                        {
-                            return client.getZooKeeper().exists("/test", false);
-                        }
-                    }
-                );
-                Assert.assertNull(stat);    // it's a different server so should be null
-            }
-            finally
-            {
-                client.close();
-            }
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(secondServer);
-        }
-    }
-
-    @Test
-    public void     testSimple() throws Exception
-    {
-        Exhibitors                  exhibitors = new Exhibitors(Lists.newArrayList("foo", "bar"), 1000, dummyConnectionStringProvider);
-        ExhibitorRestClient         mockRestClient = new ExhibitorRestClient()
-        {
-            @Override
-            public String getRaw(String hostname, int port, String uriPath, String mimeType) throws Exception
-            {
-                return "count=1&port=" + server.getPort() + "&server0=localhost";
-            }
-        };
-        ExhibitorEnsembleProvider   provider = new ExhibitorEnsembleProvider(exhibitors, mockRestClient, "/foo", 10, new RetryOneTime(1));
-        provider.pollForInitialEnsemble();
-
-        Timing                      timing = new Timing();
-        CuratorZookeeperClient      client = new CuratorZookeeperClient(provider, timing.session(), timing.connection(), null, new ExponentialBackoffRetry(timing.milliseconds(), 3));
-        client.start();
-        try
-        {
-            client.blockUntilConnectedOrTimedOut();
-            client.getZooKeeper().exists("/", false);
-        }
-        catch ( Exception e )
-        {
-            Assert.fail("provider.getConnectionString(): " + provider.getConnectionString() + " server.getPort(): " + server.getPort(), e);
-        }
-        finally
-        {
-            client.close();
-        }
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..8b39ebd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -325,13 +325,6 @@ public interface CuratorFramework extends Closeable
     SchemaSet getSchemaSet();
 
     /**
-     * Return true if this instance is running in ZK 3.4.x compatibility mode
-     *
-     * @return true/false
-     */
-    boolean isZk34CompatibilityMode();
-
-    /**
      * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
      * done from the {@link #runSafe(Runnable)} thread.
      *
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 887a2aa..9c075bb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -54,8 +54,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.curator.CuratorZookeeperClient;
 
-import static org.apache.curator.utils.Compatibility.isZK34;
-
 /**
  * Factory methods for creating framework-style clients
  */
@@ -150,7 +148,6 @@ public class CuratorFrameworkFactory
         private ConnectionStateErrorPolicy connectionStateErrorPolicy = new StandardConnectionStateErrorPolicy();
         private ConnectionHandlingPolicy connectionHandlingPolicy = new StandardConnectionHandlingPolicy();
         private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
-        private boolean zk34CompatibilityMode = isZK34();
         private int waitForShutdownTimeoutMs = 0;
         private Executor runSafeService = null;
         private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;
@@ -395,20 +392,6 @@ public class CuratorFrameworkFactory
         }
 
         /**
-         * If mode is true, create a ZooKeeper 3.4.x compatible client. IMPORTANT: If the client
-         * library used is ZooKeeper 3.4.x <code>zk34CompatibilityMode</code> is enabled by default.
-         *
-         * @since 3.5.0
-         * @param mode true/false
-         * @return this
-         */
-        public Builder zk34CompatibilityMode(boolean mode)
-        {
-            this.zk34CompatibilityMode = mode;
-            return this;
-        }
-
-        /**
          * Set a timeout for {@link CuratorZookeeperClient#close(int)}  }.
          * The default is 0, which means that this feature is disabled.
          *
@@ -591,11 +574,6 @@ public class CuratorFrameworkFactory
             return schemaSet;
         }
 
-        public boolean isZk34CompatibilityMode()
-        {
-            return zk34CompatibilityMode;
-        }
-
         @Deprecated
         public String getAuthScheme()
         {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java b/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java
deleted file mode 100644
index e499a7b..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/SafeIsTtlMode.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework;
-
-import org.apache.curator.utils.Compatibility;
-import org.apache.zookeeper.CreateMode;
-
-public class SafeIsTtlMode
-{
-    private static class Internal
-    {
-        private static final Internal instance = new Internal();
-
-        public boolean isTtl(CreateMode mode)
-        {
-            return mode.isTTL();
-        }
-    }
-
-    public static boolean isTtl(CreateMode mode)
-    {
-        return !Compatibility.isZK34() && Internal.instance.isTtl(mode);
-    }
-
-    private SafeIsTtlMode()
-    {
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java
deleted file mode 100644
index 30ca391..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CompatibleCreateCallback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.zookeeper.data.Stat;
-
-interface CompatibleCreateCallback
-{
-    void processResult(int rc, String path, Object ctx, String name, Stat stat);
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index a02a6be..ee5c541 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -124,7 +124,6 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
     @Override
     public CreateBuilderMain withTtl(long ttl)
     {
-        Preconditions.checkState(!client.isZk34CompatibilityMode(), "TTLs are not support when running in ZooKeeper 3.4 compatibility mode");
         this.ttl = ttl;
         return this;
     }
@@ -182,14 +181,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                 }
 
                 String fixedPath = client.fixForNamespace(path);
-                if ( client.isZk34CompatibilityMode() )
-                {
-                    transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
-                }
-                else
-                {
-                    transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
-                }
+                transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
                 return context;
             }
         };
@@ -636,10 +628,11 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
             final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Background");
             final byte[] data = operationAndData.getData().getData();
 
-            final CompatibleCreateCallback mainCallback = new CompatibleCreateCallback()
+            AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback()
             {
                 @Override
-                public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
+                public void processResult(int rc, String path, Object ctx, String name, Stat stat)
+                {
                     trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();
 
                     if ( (stat != null) && (storingStat != null) )
@@ -661,41 +654,16 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                     }
                 }
             };
-
-            if ( client.isZk34CompatibilityMode() )
-            {
-                AsyncCallback.StringCallback stringCallback = new AsyncCallback.StringCallback()
-                {
-                    @Override
-                    public void processResult(int rc, String path, Object ctx, String name)
-                    {
-                        mainCallback.processResult(rc, path, ctx, name, null);
-                    }
-                };
-                client.getZooKeeper().create
-                    (
-                        operationAndData.getData().getPath(),
-                        data,
-                        acling.getAclList(operationAndData.getData().getPath()),
-                        createMode,
-                        stringCallback,
-                        backgrounding.getContext()
-                    );
-            }
-            else
-            {
-                CreateZK35.create
-                    (
-                        client.getZooKeeper(),
-                        operationAndData.getData().getPath(),
-                        data,
-                        acling.getAclList(operationAndData.getData().getPath()),
-                        createMode,
-                        mainCallback,
-                        backgrounding.getContext(),
-                        ttl
-                    );
-            }
+            client.getZooKeeper().create
+                (
+                    operationAndData.getData().getPath(),
+                    data,
+                    acling.getAclList(operationAndData.getData().getPath()),
+                    createMode,
+                    callback,
+                    backgrounding.getContext(),
+                    ttl
+                );
         }
         catch ( Throwable e )
         {
@@ -1171,28 +1139,14 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                         {
                             try
                             {
-                                if ( client.isZk34CompatibilityMode() )
-                                {
-                                    createdPath = client.getZooKeeper().create(path, data, aclList, createMode);
-                                }
-                                else
-                                {
-                                    createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
-                                }
+                                createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
                             }
                             catch ( KeeperException.NoNodeException e )
                             {
                                 if ( createParentsIfNeeded )
                                 {
                                     ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
-                                    if ( client.isZk34CompatibilityMode() )
-                                    {
-                                        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
-                                    }
-                                    else
-                                    {
-                                        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
-                                    }
+                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
                                 }
                                 else
                                 {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java
deleted file mode 100644
index a32e614..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateZK35.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import java.util.List;
-
-// keep reference to AsyncCallback.Create2Callback in separate class for ZK 3.4 compatibility
-class CreateZK35
-{
-    static void create(ZooKeeper zooKeeper, String path, byte data[], List<ACL> acl, CreateMode createMode, final CompatibleCreateCallback compatibleCallback, Object ctx, long ttl)
-    {
-        AsyncCallback.Create2Callback callback = new AsyncCallback.Create2Callback()
-        {
-            @Override
-            public void processResult(int rc, String path, Object ctx, String name, Stat stat)
-            {
-                compatibleCallback.processResult(rc, path, ctx, name, stat);
-            }
-        };
-        zooKeeper.create(path, data, acl, createMode, callback, ctx, ttl);
-    }
-
-    private CreateZK35()
-    {
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..90f740d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.curator.CuratorConnectionLossException;
@@ -36,7 +35,7 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
 import org.apache.curator.framework.api.transaction.CuratorTransaction;
 import org.apache.curator.framework.api.transaction.TransactionOp;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
@@ -67,8 +66,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorZookeeperClient client;
-    private final ListenerContainer<CuratorListener> listeners;
-    private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
+    private final StandardListenerManager<CuratorListener> listeners;
+    private final StandardListenerManager<UnhandledErrorListener> unhandledErrorListeners;
     private final ThreadFactory threadFactory;
     private final int maxCloseWaitMs;
     private final BlockingQueue<OperationAndData<?>> backgroundOperations;
@@ -88,7 +87,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final InternalConnectionHandler internalConnectionHandler;
     private final EnsembleTracker ensembleTracker;
     private final SchemaSet schemaSet;
-    private final boolean zk34CompatibilityMode;
     private final Executor runSafeService;
 
     private volatile ExecutorService executorService;
@@ -132,8 +130,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
             );
 
         internalConnectionHandler = new StandardInternalConnectionHandler();
-        listeners = new ListenerContainer<CuratorListener>();
-        unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
+        listeners = StandardListenerManager.standard();
+        unhandledErrorListeners = StandardListenerManager.standard();
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
         forcedSleepOperations = new LinkedBlockingQueue<>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
@@ -146,7 +144,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
         connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
         schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
-        zk34CompatibilityMode = builder.isZk34CompatibilityMode();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -156,7 +153,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
 
-        ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());
+        ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
 
         runSafeService = makeRunSafeService(builder);
     }
@@ -254,7 +251,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateErrorPolicy = parent.connectionStateErrorPolicy;
         internalConnectionHandler = parent.internalConnectionHandler;
         schemaSet = parent.schemaSet;
-        zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
     }
@@ -368,22 +364,17 @@ public class CuratorFrameworkImpl implements CuratorFramework
         log.debug("Closing");
         if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
         {
-            listeners.forEach(new Function<CuratorListener, Void>()
+            listeners.forEach(listener ->
             {
-                @Override
-                public Void apply(CuratorListener listener)
+                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
+                try
                 {
-                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
-                    try
-                    {
-                        listener.eventReceived(CuratorFrameworkImpl.this, event);
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        log.error("Exception while sending Closing event", e);
-                    }
-                    return null;
+                    listener.eventReceived(CuratorFrameworkImpl.this, event);
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Exception while sending Closing event", e);
                 }
             });
 
@@ -498,14 +489,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
     @Override
     public ReconfigBuilder reconfig()
     {
-        Preconditions.checkState(!isZk34CompatibilityMode(), "reconfig/config APIs are not support when running in ZooKeeper 3.4 compatibility mode");
         return new ReconfigBuilderImpl(this);
     }
 
     @Override
     public GetConfigBuilder getConfig()
     {
-        Preconditions.checkState(!isZk34CompatibilityMode(), "reconfig/config APIs are not support when running in ZooKeeper 3.4 compatibility mode");
         return new GetConfigBuilderImpl(this);
     }
 
@@ -567,7 +556,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     @Override
     public RemoveWatchesBuilder watches()
     {
-        Preconditions.checkState(!isZk34CompatibilityMode(), "Remove watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
         return new RemoveWatchesBuilderImpl(this);
     }
 
@@ -705,15 +693,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         }
 
         final String localReason = reason;
-        unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
-        {
-            @Override
-            public Void apply(UnhandledErrorListener listener)
-            {
-                listener.unhandledError(localReason, e);
-                return null;
-            }
-        });
+        unhandledErrorListeners.forEach(l -> l.unhandledError(localReason, e));
 
         if ( debugUnhandledErrorListener != null )
         {
@@ -824,12 +804,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateManager.addStateChange(newConnectionState);
     }
 
-    @Override
-    public boolean isZk34CompatibilityMode()
-    {
-        return zk34CompatibilityMode;
-    }
-
     EnsembleTracker getEnsembleTracker()
     {
         return ensembleTracker;
@@ -1037,23 +1011,18 @@ public class CuratorFrameworkImpl implements CuratorFramework
             validateConnection(curatorEvent.getWatchedEvent().getState());
         }
 
-        listeners.forEach(new Function<CuratorListener, Void>()
+        listeners.forEach(listener ->
         {
-            @Override
-            public Void apply(CuratorListener listener)
+            try
             {
-                try
-                {
-                    OperationTrace trace = client.startAdvancedTracer("EventListener");
-                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
-                    trace.commit();
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    logError("Event listener threw exception", e);
-                }
-                return null;
+                OperationTrace trace = client.startAdvancedTracer("EventListener");
+                listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
+                trace.commit();
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                logError("Event listener threw exception", e);
             }
         });
     }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
index 9057934..bdab158 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
@@ -35,7 +35,6 @@ import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
 import org.apache.curator.framework.schema.Schema;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.proto.CreateRequest;
@@ -136,22 +135,7 @@ public class CuratorMultiTransactionImpl implements
             if ( (curatorOp.get().getType() == ZooDefs.OpCode.create) || (curatorOp.get().getType() == ZooDefs.OpCode.createContainer) )
             {
                 CreateRequest createRequest = (CreateRequest)curatorOp.get().toRequestRecord();
-                CreateMode createMode;
-                if ( client.isZk34CompatibilityMode() )
-                {
-                    try
-                    {
-                        createMode = CreateMode.fromFlag(createRequest.getFlags());
-                    }
-                    catch ( KeeperException.BadArgumentsException dummy )
-                    {
-                        createMode = CreateMode.PERSISTENT;
-                    }
-                }
-                else
-                {
-                    createMode = CreateMode.fromFlag(createRequest.getFlags(), CreateMode.PERSISTENT);
-                }
+                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags(), CreateMode.PERSISTENT);
                 schema.validateCreate(createMode, createRequest.getPath(), createRequest.getData(), createRequest.getAcl());
             }
             else if ( (curatorOp.get().getType() == ZooDefs.OpCode.delete) || (curatorOp.get().getType() == ZooDefs.OpCode.deleteContainer) )
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index bdb5428..b9c9044 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@ -53,16 +53,11 @@ public class WatcherRemovalManager
 
     void removeWatchers()
     {
-        if ( client.isZk34CompatibilityMode() )
-        {
-            return;
-        }
-
         List<NamespaceWatcher> localEntries = Lists.newArrayList(entries);
         while ( localEntries.size() > 0 )
         {
             NamespaceWatcher watcher = localEntries.remove(0);
-            if ( entries.remove(watcher) && !client.isZk34CompatibilityMode() )
+            if ( entries.remove(watcher) )
             {
                 try
                 {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
deleted file mode 100644
index 9139439..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.listen;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Abstracts an object that has listeners
- *
- * @deprecated Prefer {@link MappingListenerManager} and
- * {@link StandardListenerManager}
- */
-@Deprecated
-public class ListenerContainer<T> implements Listenable<T>
-{
-    private final Logger                        log = LoggerFactory.getLogger(getClass());
-    private final Map<T, ListenerEntry<T>>      listeners = Maps.newConcurrentMap();
-
-    @Override
-    public void addListener(T listener)
-    {
-        addListener(listener, MoreExecutors.directExecutor());
-    }
-
-    @Override
-    public void addListener(T listener, Executor executor)
-    {
-        listeners.put(listener, new ListenerEntry<T>(listener, executor));
-    }
-
-    @Override
-    public void removeListener(T listener)
-    {
-        if ( listener != null )
-        {
-            listeners.remove(listener);
-        }
-    }
-
-    /**
-     * Remove all listeners
-     */
-    public void     clear()
-    {
-        listeners.clear();
-    }
-
-    /**
-     * Return the number of listeners
-     *
-     * @return number
-     */
-    public int      size()
-    {
-        return listeners.size();
-    }
-
-    /**
-     * Utility - apply the given function to each listener. The function receives
-     * the listener as an argument.
-     *
-     * @param function function to call for each listener
-     */
-    public void     forEach(final Function<T, Void> function)
-    {
-        for ( final ListenerEntry<T> entry : listeners.values() )
-        {
-            entry.executor.execute
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        try
-                        {
-                            function.apply(entry.listener);
-                        }
-                        catch ( Throwable e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
-                        }
-                    }
-                }
-            );
-        }
-    }
-}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
index bd9f51a..5a6c9c7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
@@ -28,7 +28,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
- * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
+ * Upgraded version of ListenerContainer that
  * doesn't leak Guava's internals and also supports mapping/wrapping of listeners
  */
 public class MappingListenerManager<K, V> implements ListenerManager<K, V>
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 46325d2..7285431 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.UnaryListenerManager;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -308,7 +307,7 @@ public class ConnectionStateManager implements Closeable
                 log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs));
                 try
                 {
-                    Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+                    client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
                 }
                 catch ( Exception e )
                 {
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
index d80e053..b4791ff 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.WatchersDebug;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.ZooKeeper;
 import java.util.concurrent.Callable;
 
@@ -35,12 +34,6 @@ public class TestCleanState
             return;
         }
 
-        if ( Compatibility.isZK34() )
-        {
-            CloseableUtils.closeQuietly(client);
-            return;
-        }
-
         try
         {
             Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
index 034791d..10b237f 100755
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -34,7 +33,6 @@ import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestCreateReturningStat extends CuratorTestBase
 {
     private CuratorFramework createClient()
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index 773d9c9..1656351 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -127,7 +126,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
     {
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
 
-        Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+        client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
         Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..8b22812 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -32,7 +32,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ZKPaths;
@@ -77,7 +76,6 @@ public class TestFramework extends BaseClassForTests
         super.teardown();
     }
 
-    @Test(groups = Zk35MethodInterceptor.zk35Group)
     public void testWaitForShutdownTimeoutMs() throws Exception
     {
         final BlockingQueue<Integer> timeoutQueue = new ArrayBlockingQueue<>(1);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 7c6d156..c091fad 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -24,7 +24,6 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.RetrySleeper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.SafeIsTtlMode;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -37,16 +36,15 @@ import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -63,7 +61,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestFrameworkEdges extends BaseClassForTests
@@ -92,7 +89,7 @@ public class TestFrameworkEdges extends BaseClassForTests
                 }
             };
             client.checkExists().usingWatcher(watcher).forPath("/foobar");
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.awaitLatch(expiredLatch));
         }
     }
@@ -327,7 +324,7 @@ public class TestFrameworkEdges extends BaseClassForTests
             final String TEST_PATH = "/a/b/c/test-";
             long ttl = timing.forWaiting().milliseconds() * 1000;
             CreateBuilder firstCreateBuilder = client.create();
-            if ( SafeIsTtlMode.isTtl(mode) )
+            if ( mode.isTTL() )
             {
                 firstCreateBuilder.withTtl(ttl);
             }
@@ -343,7 +340,7 @@ public class TestFrameworkEdges extends BaseClassForTests
 
             CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
             createBuilder.withProtection();
-            if ( SafeIsTtlMode.isTtl(mode) )
+            if ( mode.isTTL() )
             {
                 createBuilder.withTtl(ttl);
             }
@@ -532,7 +529,7 @@ public class TestFrameworkEdges extends BaseClassForTests
             };
 
             client.checkExists().usingWatcher(watcher).forPath("/sessionTest");
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.awaitLatch(sessionDiedLatch));
             Assert.assertNotNull(client.checkExists().forPath("/sessionTest"));
         }
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index c6ff2bb..1ff2805 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -32,7 +32,6 @@ import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingZooKeeperServer;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -57,7 +56,6 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestReconfiguration extends CuratorTestBase
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 63d8931..82f2cf4 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -32,7 +32,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -45,7 +44,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestRemoveWatches extends CuratorTestBase
 {
     private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client)
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
index e2156df..fc37e41 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
@@ -25,7 +25,6 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -34,7 +33,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestTtlNodes extends CuratorTestBase
 {
     @BeforeClass
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
index 74aac1d..8b17576 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -37,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestWatcherRemovalManager extends CuratorTestBase
 {
     @Test
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index e7778ff..c654f4f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -19,7 +19,6 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
@@ -27,7 +26,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -62,7 +62,7 @@ public class NodeCache implements Closeable
     private final boolean dataIsCompressed;
     private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
+    private final StandardListenerManager<NodeCacheListener> listeners = StandardListenerManager.standard();
     private final AtomicBoolean isConnected = new AtomicBoolean(true);
     private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
@@ -204,7 +204,7 @@ public class NodeCache implements Closeable
      *
      * @return listenable
      */
-    public ListenerContainer<NodeCacheListener> getListenable()
+    public Listenable<NodeCacheListener> getListenable()
     {
         Preconditions.checkState(state.get() != State.CLOSED, "Closed");
 
@@ -314,26 +314,17 @@ public class NodeCache implements Closeable
         ChildData   previousData = data.getAndSet(newData);
         if ( !Objects.equal(previousData, newData) )
         {
-            listeners.forEach
-            (
-                new Function<NodeCacheListener, Void>()
+            listeners.forEach(listener -> {
+                try
                 {
-                    @Override
-                    public Void apply(NodeCacheListener listener)
-                    {
-                        try
-                        {
-                            listener.nodeChanged();
-                        }
-                        catch ( Exception e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            log.error("Calling listener", e);
-                        }
-                        return null;
-                    }
+                    listener.nodeChanged();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Calling listener", e);
                 }
-            );
+            });
 
             if ( rebuildTestExchanger != null )
             {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..14c889a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
@@ -31,7 +30,8 @@ import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
@@ -74,7 +74,7 @@ public class PathChildrenCache implements Closeable
     private final CloseableExecutorService executorService;
     private final boolean cacheData;
     private final boolean dataIsCompressed;
-    private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
+    private final StandardListenerManager<PathChildrenCacheListener> listeners = StandardListenerManager.standard();
     private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
     private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
     private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
@@ -394,7 +394,7 @@ public class PathChildrenCache implements Closeable
      *
      * @return listenable
      */
-    public ListenerContainer<PathChildrenCacheListener> getListenable()
+    public Listenable<PathChildrenCacheListener> getListenable()
     {
         return listeners;
     }
@@ -526,26 +526,17 @@ public class PathChildrenCache implements Closeable
 
     void callListeners(final PathChildrenCacheEvent event)
     {
-        listeners.forEach
-            (
-                new Function<PathChildrenCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(PathChildrenCacheListener listener)
-                    {
-                        try
-                        {
-                            listener.childEvent(client, event);
-                        }
-                        catch ( Exception e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            handleException(e);
-                        }
-                        return null;
-                    }
-                }
-            );
+        listeners.forEach(listener -> {
+            try
+            {
+                listener.childEvent(client, event);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                handleException(e);
+            }
+        });
     }
 
     void getDataAndStat(final String fullPath) throws Exception
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..ad14dab 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -33,7 +32,7 @@ import org.apache.curator.framework.api.Pathable;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.api.Watchable;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -531,8 +530,8 @@ public class TreeCache implements Closeable
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final int maxDepth;
-    private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
-    private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>();
+    private final StandardListenerManager<TreeCacheListener> listeners = StandardListenerManager.standard();
+    private final StandardListenerManager<UnhandledErrorListener> errorListeners = StandardListenerManager.standard();
     private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
 
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
@@ -750,21 +749,16 @@ public class TreeCache implements Closeable
 
     private void callListeners(final TreeCacheEvent event)
     {
-        listeners.forEach(new Function<TreeCacheListener, Void>()
+        listeners.forEach(listener ->
         {
-            @Override
-            public Void apply(TreeCacheListener listener)
+            try
             {
-                try
-                {
-                    listener.childEvent(client, event);
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    handleException(e);
-                }
-                return null;
+                listener.childEvent(client, event);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                handleException(e);
             }
         });
     }
@@ -780,21 +774,16 @@ public class TreeCache implements Closeable
         }
         else
         {
-            errorListeners.forEach(new Function<UnhandledErrorListener, Void>()
+            errorListeners.forEach(listener ->
             {
-                @Override
-                public Void apply(UnhandledErrorListener listener)
+                try
                 {
-                    try
-                    {
-                        listener.unhandledError("", e);
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        LOG.error("Exception handling exception", e);
-                    }
-                    return null;
+                    listener.unhandledError("", e);
+                }
+                catch ( Exception e2 )
+                {
+                    ThreadUtils.checkInterrupted(e2);
+                    LOG.error("Exception handling exception", e2);
                 }
             });
         }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 79b2601..7d9ca3c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -20,13 +20,12 @@
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.AfterConnectionEstablished;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
@@ -71,7 +70,7 @@ public class LeaderLatch implements Closeable
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
     private final AtomicReference<String> ourPath = new AtomicReference<String>();
-    private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+    private final StandardListenerManager<LeaderLatchListener> listeners = StandardListenerManager.standard();
     private final CloseMode closeMode;
     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
 
@@ -682,27 +681,11 @@ public class LeaderLatch implements Closeable
 
         if ( oldValue && !newValue )
         { // Lost leadership, was true, now false
-            listeners.forEach(new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener listener)
-                    {
-                        listener.notLeader();
-                        return null;
-                    }
-                });
+            listeners.forEach(LeaderLatchListener::notLeader);
         }
         else if ( !oldValue && newValue )
         { // Gained leadership, was false, now true
-            listeners.forEach(new Function<LeaderLatchListener, Void>()
-                {
-                    @Override
-                    public Void apply(LeaderLatchListener input)
-                    {
-                        input.isLeader();
-                        return null;
-                    }
-                });
+            listeners.forEach(LeaderLatchListener::isLeader);
         }
 
         notifyAll();
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
deleted file mode 100644
index 3c8b56a..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ /dev/null
@@ -1,307 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.utils.CloseableScheduledExecutorService;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.PathUtils;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on
- * the node and adds empty nodes to an internally managed {@link Reaper}
- *
- * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
- * Also, all Curator recipes create container parents.
- */
-@Deprecated
-public class ChildReaper implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final Reaper reaper;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final CuratorFramework client;
-    private final Collection<String> paths = Sets.newConcurrentHashSet();
-    private volatile Iterator<String> pathIterator = null;
-    private final Reaper.Mode mode;
-    private final CloseableScheduledExecutorService executor;
-    private final int reapingThresholdMs;
-    private final LeaderLatch leaderLatch;
-    private final Set<String> lockSchema;
-    private final AtomicInteger maxChildren = new AtomicInteger(-1);
-
-    private volatile Future<?> task;
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param mode reaping mode
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode)
-    {
-        this(client, path, mode, newExecutorService(), Reaper.DEFAULT_REAPING_THRESHOLD_MS, null);
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, int reapingThresholdMs)
-    {
-        this(client, path, mode, newExecutorService(), reapingThresholdMs, null);
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param executor executor to use for background tasks
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs)
-    {
-        this(client, path, mode, executor, reapingThresholdMs, null);
-    }
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param executor executor to use for background tasks
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
-    {
-        this(client, path, mode, executor, reapingThresholdMs, leaderPath, Collections.<String>emptySet());
-    }
-
-
-    /**
-     * @param client the client
-     * @param path path to reap children from
-     * @param executor executor to use for background tasks
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param mode reaping mode
-     * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
-     * @param lockSchema a set of the possible subnodes of the children of path that must be reaped in addition to the child nodes
-     */
-    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, Set<String> lockSchema)
-    {
-        this.client = client;
-        this.mode = mode;
-        this.executor = new CloseableScheduledExecutorService(executor);
-        this.reapingThresholdMs = reapingThresholdMs;
-        if (leaderPath != null)
-        {
-            leaderLatch = new LeaderLatch(client, leaderPath);
-        }
-        else
-        {
-            leaderLatch = null;
-        }
-        this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
-        this.lockSchema = lockSchema;
-        addPath(path);
-    }
-
-    /**
-     * The reaper must be started
-     *
-     * @throws Exception errors
-     */
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
-        task = executor.scheduleWithFixedDelay
-        (
-            new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    doWork();
-                }
-            },
-            reapingThresholdMs,
-            reapingThresholdMs,
-            TimeUnit.MILLISECONDS
-        );
-        if (leaderLatch != null)
-        {
-            leaderLatch.start();
-        }
-        reaper.start();
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            CloseableUtils.closeQuietly(reaper);
-            if (leaderLatch != null)
-            {
-                CloseableUtils.closeQuietly(leaderLatch);
-            }
-            task.cancel(true);
-        }
-    }
-
-    /**
-     * Add a path to reap children from
-     *
-     * @param path the path
-     * @return this for chaining
-     */
-    public ChildReaper addPath(String path)
-    {
-        paths.add(PathUtils.validatePath(path));
-        return this;
-    }
-
-    /**
-     * Remove a path from reaping
-     *
-     * @param path the path
-     * @return true if the path existed and was removed
-     */
-    public boolean removePath(String path)
-    {
-        return paths.remove(PathUtils.validatePath(path));
-    }
-
-    /**
-     * If a node has so many children that {@link CuratorFramework#getChildren()} will fail
-     * (due to jute.maxbuffer) it can cause connection instability. Set the max number of
-     * children here to prevent the path from being queried in these cases. The number should usually
-     * be: average-node-name-length/1000000
-     *
-     * @param maxChildren max children
-     */
-    public void setMaxChildren(int maxChildren)
-    {
-        this.maxChildren.set(maxChildren);
-    }
-
-    public static ScheduledExecutorService newExecutorService()
-    {
-        return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
-    }
-
-    @VisibleForTesting
-    protected void warnMaxChildren(String path, Stat stat)
-    {
-        log.warn(String.format("Skipping %s as it has too many children: %d", path, stat.getNumChildren()));
-    }
-
-    private void doWork()
-    {
-        if ( shouldDoWork() )
-        {
-            if ( (pathIterator == null) || !pathIterator.hasNext() )
-            {
-                pathIterator = paths.iterator();
-            }
-            while ( pathIterator.hasNext() )
-            {
-                String path = pathIterator.next();
-                try
-                {
-                    int maxChildren = this.maxChildren.get();
-                    if ( maxChildren > 0 )
-                    {
-                        Stat stat = client.checkExists().forPath(path);
-                        if ( (stat != null) && (stat.getNumChildren() > maxChildren) )
-                        {
-                            warnMaxChildren(path, stat);
-                            continue;
-                        }
-                    }
-
-                    List<String> children = client.getChildren().forPath(path);
-                    log.info(String.format("Found %d children for %s", children.size(), path));
-                    for ( String name : children )
-                    {
-                        String childPath = ZKPaths.makePath(path, name);
-                        addPathToReaperIfEmpty(childPath);
-                        for ( String subNode : lockSchema )
-                        {
-                            addPathToReaperIfEmpty(ZKPaths.makePath(childPath, subNode));
-                        }
-
-                    }
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    log.error("Could not get children for path: " + path, e);
-                }
-            }
-        }
-    }
-
-    private void addPathToReaperIfEmpty(String path) throws Exception
-    {
-        Stat stat = client.checkExists().forPath(path);
-        if ( (stat != null) && (stat.getNumChildren() == 0) )
-        {
-            log.info("Adding " + path);
-            reaper.addPath(path, mode);
-        }
-    }
-
-    private boolean shouldDoWork()
-    {
-        return this.leaderLatch == null || this.leaderLatch.hasLeadership();
-    }
-}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
deleted file mode 100644
index 2522154..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.curator.utils.CloseableScheduledExecutorService;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Utility to clean up parent lock nodes so that they don't stay around as garbage
- *
- * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
- * Also, all Curator recipes create container parents.
- */
-@Deprecated
-public class Reaper implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final CloseableScheduledExecutorService executor;
-    private final int reapingThresholdMs;
-    private final Map<String, PathHolder> activePaths = Maps.newConcurrentMap();
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final LeaderLatch leaderLatch;
-    private final AtomicBoolean reapingIsActive = new AtomicBoolean(true);
-    private final boolean ownsLeaderLatch;
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    static final int DEFAULT_REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
-
-    @VisibleForTesting
-    static final int EMPTY_COUNT_THRESHOLD = 3;
-
-    @VisibleForTesting
-    class PathHolder implements Runnable
-    {
-        final String path;
-        final Mode mode;
-        final int emptyCount;
-
-        @Override
-        public void run()
-        {
-            reap(this);
-        }
-
-        private PathHolder(String path, Mode mode, int emptyCount)
-        {
-            this.path = path;
-            this.mode = mode;
-            this.emptyCount = emptyCount;
-        }
-    }
-
-    public enum Mode
-    {
-        /**
-         * Reap forever, or until removePath is called for the path
-         */
-        REAP_INDEFINITELY,
-
-        /**
-         * Reap until the Reaper succeeds in deleting the path
-         */
-        REAP_UNTIL_DELETE,
-
-        /**
-         * Reap until the path no longer exists
-         */
-        REAP_UNTIL_GONE
-    }
-
-    /**
-     * Uses the default reaping threshold of 5 minutes and creates an internal thread pool
-     *
-     * @param client client
-     */
-    public Reaper(CuratorFramework client)
-    {
-        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, (String) null);
-    }
-
-    /**
-     * Uses the given reaping threshold and creates an internal thread pool
-     *
-     * @param client             client
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     */
-    public Reaper(CuratorFramework client, int reapingThresholdMs)
-    {
-        this(client, newExecutorService(), reapingThresholdMs, (String) null);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     */
-    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
-    {
-        this(client, executor, reapingThresholdMs, (String) null);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param leaderPath         if not null, uses a leader selection so that only 1 reaper is active in the cluster
-     */
-    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
-    {
-        this(client, executor, reapingThresholdMs, makeLeaderLatchIfPathNotNull(client, leaderPath), true);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param leaderLatch        a pre-created leader latch to ensure only 1 reaper is active in the cluster
-     */
-    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch)
-    {
-        this(client, executor, reapingThresholdMs, leaderLatch, false);
-    }
-
-    /**
-     * @param client             client
-     * @param executor           thread pool
-     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
-     * @param leaderLatch        a pre-created leader latch to ensure only 1 reaper is active in the cluster
-     * @param ownsLeaderLatch    indicates whether or not the reaper owns the leader latch (if it exists) and thus should start/stop it
-     * */
-    private Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch)
-    {
-        this.client = client;
-        this.executor = new CloseableScheduledExecutorService(executor);
-        this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
-        this.leaderLatch = leaderLatch;
-        if (leaderLatch != null)
-        {
-            addListenerToLeaderLatch(leaderLatch);
-        }
-        this.ownsLeaderLatch = ownsLeaderLatch;
-    }
-
-
-    /**
-     * Add a path (using Mode.REAP_INDEFINITELY) to be checked by the reaper. The path will be checked periodically
-     * until the reaper is closed.
-     *
-     * @param path path to check
-     */
-    public void addPath(String path)
-    {
-        addPath(path, Mode.REAP_INDEFINITELY);
-    }
-
-    /**
-     * Add a path to be checked by the reaper. The path will be checked periodically
-     * until the reaper is closed, or until the point specified by the Mode
-     *
-     * @param path path to check
-     * @param mode reaping mode
-     */
-    public void addPath(String path, Mode mode)
-    {
-        PathHolder pathHolder = new PathHolder(path, mode, 0);
-        activePaths.put(path, pathHolder);
-        schedule(pathHolder, reapingThresholdMs);
-    }
-
-    /**
-     * Stop reaping the given path
-     *
-     * @param path path to remove
-     * @return true if the path was removed
-     */
-    public boolean removePath(String path)
-    {
-        return activePaths.remove(path) != null;
-    }
-
-    /**
-     * The reaper must be started
-     *
-     * @throws Exception errors
-     */
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
-
-        if ( leaderLatch != null && ownsLeaderLatch)
-        {
-            leaderLatch.start();
-        }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            executor.close();
-            if ( leaderLatch != null && ownsLeaderLatch )
-            {
-                leaderLatch.close();
-            }
-        }
-    }
-
-    @VisibleForTesting
-    protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
-    {
-        if ( reapingIsActive.get() )
-        {
-            return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
-        }
-        return null;
-    }
-
-    @VisibleForTesting
-    protected void reap(PathHolder holder)
-    {
-        if ( !activePaths.containsKey(holder.path) )
-        {
-            return;
-        }
-
-        boolean addBack = true;
-        int newEmptyCount = 0;
-        try
-        {
-            Stat stat = client.checkExists().forPath(holder.path);
-            if ( stat != null ) // otherwise already deleted
-            {
-                if ( stat.getNumChildren() == 0 )
-                {
-                    if ( (holder.emptyCount + 1) >= EMPTY_COUNT_THRESHOLD )
-                    {
-                        try
-                        {
-                            client.delete().forPath(holder.path);
-                            log.info("Reaping path: " + holder.path);
-                            if ( holder.mode == Mode.REAP_UNTIL_DELETE || holder.mode == Mode.REAP_UNTIL_GONE )
-                            {
-                                addBack = false;
-                            }
-                        }
-                        catch ( KeeperException.NoNodeException ignore )
-                        {
-                            // Node must have been deleted by another process/thread
-                            if ( holder.mode == Mode.REAP_UNTIL_GONE )
-                            {
-                                addBack = false;
-                            }
-                        }
-                        catch ( KeeperException.NotEmptyException ignore )
-                        {
-                            // ignore - it must have been re-used
-                        }
-                    }
-                    else
-                    {
-                        newEmptyCount = holder.emptyCount + 1;
-                    }
-                }
-            }
-            else
-            {
-                if ( holder.mode == Mode.REAP_UNTIL_GONE )
-                {
-                    addBack = false;
-                }
-            }
-        }
-        catch ( Exception e )
-        {
-            ThreadUtils.checkInterrupted(e);
-            log.error("Trying to reap: " + holder.path, e);
-        }
-
-        if ( !addBack )
-        {
-            activePaths.remove(holder.path);
-        }
-        else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.containsKey(holder.path) )
-        {
-            activePaths.put(holder.path, holder);
-            schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
-        }
-    }
-
-    /**
-     * Allocate an executor service for the reaper
-     *
-     * @return service
-     */
-    public static ScheduledExecutorService newExecutorService()
-    {
-        return ThreadUtils.newSingleThreadScheduledExecutor("Reaper");
-    }
-
-    private void addListenerToLeaderLatch(LeaderLatch leaderLatch)
-    {
-
-        LeaderLatchListener listener = new LeaderLatchListener()
-        {
-            @Override
-            public void isLeader()
-            {
-                reapingIsActive.set(true);
-                for ( PathHolder holder : activePaths.values() )
-                {
-                    schedule(holder, reapingThresholdMs);
-                }
-            }
-
-            @Override
-            public void notLeader()
-            {
-                reapingIsActive.set(false);
-            }
-        };
-        leaderLatch.addListener(listener);
-
-        reapingIsActive.set(leaderLatch.hasLeadership());
-    }
-
-    private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath)
-    {
-        if (leaderPath == null)
-        {
-            return null;
-        }
-        else
-        {
-            return new LeaderLatch(client, leaderPath);
-        }
-    }
-}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 81e8dd9..f7b4653 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -19,11 +19,9 @@
 
 package org.apache.curator.framework.recipes.nodes;
 
-import com.google.common.base.Function;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.SafeIsTtlMode;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -32,7 +30,8 @@ import org.apache.curator.framework.api.CreateBuilderMain;
 import org.apache.curator.framework.api.CreateModable;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -77,7 +76,7 @@ public class PersistentNode implements Closeable
     private final BackgroundCallback backgroundCallback;
     private final boolean useProtection;
     private final AtomicReference<CreateModable<ACLBackgroundPathAndBytesable<String>>> createMethod = new AtomicReference<CreateModable<ACLBackgroundPathAndBytesable<String>>>(null);
-    private final ListenerContainer<PersistentNodeListener> listeners = new ListenerContainer<PersistentNodeListener>();
+    private final StandardListenerManager<PersistentNodeListener> listeners = StandardListenerManager.standard();
     private final CuratorWatcher watcher = new CuratorWatcher()
     {
         @Override
@@ -362,7 +361,7 @@ public class PersistentNode implements Closeable
      *
      * @return listenable
      */
-    public ListenerContainer<PersistentNodeListener> getListenable()
+    public Listenable<PersistentNodeListener> getListenable()
     {
         return listeners;
     }
@@ -462,7 +461,7 @@ public class PersistentNode implements Closeable
             CreateModable<ACLBackgroundPathAndBytesable<String>> localCreateMethod = createMethod.get();
             if ( localCreateMethod == null )
             {
-                CreateBuilderMain createBuilder = SafeIsTtlMode.isTtl(mode) ? client.create().withTtl(ttl) : client.create();
+                CreateBuilderMain createBuilder = mode.isTTL() ? client.create().withTtl(ttl) : client.create();
                 CreateModable<ACLBackgroundPathAndBytesable<String>> tempCreateMethod = useProtection ? createBuilder.creatingParentContainersIfNeeded().withProtection() : createBuilder.creatingParentContainersIfNeeded();
                 createMethod.compareAndSet(null, tempCreateMethod);
                 localCreateMethod = createMethod.get();
@@ -523,25 +522,17 @@ public class PersistentNode implements Closeable
     private void notifyListeners()
     {
         final String path = getActualPath();
-        listeners.forEach(
-             new Function<PersistentNodeListener, Void>()
-             {
-                 @Override
-                 public Void apply(PersistentNodeListener listener)
-                 {
-                     try
-                    {
-                        listener.nodeCreated(path);
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        log.error("From PersistentNode listener", e);
-                    }
-                    return null;
-                }
-             }
-        );
+        listeners.forEach(listener -> {
+            try
+            {
+                listener.nodeCreated(path);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.error("From PersistentNode listener", e);
+            }
+        });
     }
 
     private boolean isActive()
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
index 8f321b3..5327a09 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
@@ -215,7 +215,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
      * @return put listener container
      */
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return queue.getPutListenerContainer();
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
index 15045aa..45b84c4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedIdQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.IOException;
@@ -105,7 +105,7 @@ public class DistributedIdQueue<T> implements QueueBase<T>
     }
 
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return queue.getPutListenerContainer();
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
index 4b70ec5..be61de2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedPriorityQueue.java
@@ -21,7 +21,7 @@ package org.apache.curator.framework.recipes.queue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.Executor;
@@ -176,7 +176,7 @@ public class DistributedPriorityQueue<T> implements Closeable, QueueBase<T>
      * @return put listener container
      */
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return queue.getPutListenerContainer();
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
index 14d1266..0df2930 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
@@ -19,18 +19,18 @@
 package org.apache.curator.framework.recipes.queue;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
@@ -50,7 +50,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * <p>An implementation of the Distributed Queue ZK recipe. Items put into the queue
@@ -81,7 +80,7 @@ public class DistributedQueue<T> implements QueueBase<T>
     private final boolean isProducerOnly;
     private final String lockPath;
     private final AtomicReference<ErrorMode> errorMode = new AtomicReference<ErrorMode>(ErrorMode.REQUEUE);
-    private final ListenerContainer<QueuePutListener<T>> putListenerContainer = new ListenerContainer<QueuePutListener<T>>();
+    private final StandardListenerManager<QueuePutListener<T>> putListenerContainer = StandardListenerManager.standard();
     private final AtomicInteger lastChildCount = new AtomicInteger(0);
     private final int maxItems;
     private final int finalFlushMs;
@@ -233,7 +232,7 @@ public class DistributedQueue<T> implements QueueBase<T>
      * @return put listener container
      */
     @Override
-    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer()
+    public Listenable<QueuePutListener<T>> getPutListenerContainer()
     {
         return putListenerContainer;
     }
@@ -407,25 +406,16 @@ public class DistributedQueue<T> implements QueueBase<T>
             putCount.decrementAndGet();
             putCount.notifyAll();
         }
-        putListenerContainer.forEach
-        (
-            new Function<QueuePutListener<T>, Void>()
+        putListenerContainer.forEach(listener -> {
+            if ( item != null )
             {
-                @Override
-                public Void apply(QueuePutListener<T> listener)
-                {
-                    if ( item != null )
-                    {
-                        listener.putCompleted(item);
-                    }
-                    else
-                    {
-                        listener.putMultiCompleted(givenMultiItem);
-                    }
-                    return null;
-                }
+                listener.putCompleted(item);
             }
-        );
+            else
+            {
+                listener.putMultiCompleted(givenMultiItem);
+            }
+        });
     }
 
     private void doPutInBackground(final T item, String path, final MultiItem<T> givenMultiItem, byte[] bytes) throws Exception
@@ -449,25 +439,16 @@ public class DistributedQueue<T> implements QueueBase<T>
                     }
                 }
 
-                putListenerContainer.forEach
-                (
-                    new Function<QueuePutListener<T>, Void>()
+                putListenerContainer.forEach(listener -> {
+                    if ( item != null )
                     {
-                        @Override
-                        public Void apply(QueuePutListener<T> listener)
-                        {
-                            if ( item != null )
-                            {
-                                listener.putCompleted(item);
-                            }
-                            else
-                            {
-                                listener.putMultiCompleted(givenMultiItem);
-                            }
-                            return null;
-                        }
+                        listener.putCompleted(item);
                     }
-                );
+                    else
+                    {
+                        listener.putMultiCompleted(givenMultiItem);
+                    }
+                });
             }
         };
         internalCreateNode(path, bytes, callback);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
index 0a21ce8..f9fee13 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueBase.java
@@ -18,7 +18,7 @@
  */
 package org.apache.curator.framework.recipes.queue;
 
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
@@ -36,7 +36,7 @@ public interface QueueBase<T> extends Closeable
      *
      * @return put listener container
      */
-    ListenerContainer<QueuePutListener<T>> getPutListenerContainer();
+    Listenable<QueuePutListener<T>> getPutListenerContainer();
 
     /**
      * Used when the queue is created with a {@link QueueBuilder#lockPath(String)}. Determines
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 5d7abce..d605234 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -20,14 +20,14 @@
 package org.apache.curator.framework.recipes.shared;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -52,7 +52,7 @@ public class SharedValue implements Closeable, SharedValueReader
 	private static final int UNINITIALIZED_VERSION = -1;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>();
+    private final StandardListenerManager<SharedValueListener> listeners = StandardListenerManager.standard();
     private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final byte[] seedValue;
@@ -235,7 +235,7 @@ public class SharedValue implements Closeable, SharedValueReader
      *
      * @return listenable
      */
-    public ListenerContainer<SharedValueListener> getListenable()
+    public Listenable<SharedValueListener> getListenable()
     {
         return listeners;
     }
@@ -297,41 +297,21 @@ public class SharedValue implements Closeable, SharedValueReader
     private void notifyListeners()
     {
         final byte[] localValue = getValue();
-        listeners.forEach
-            (
-                new Function<SharedValueListener, Void>()
-                {
-                    @Override
-                    public Void apply(SharedValueListener listener)
-                    {
-                        try
-                        {
-                            listener.valueHasChanged(SharedValue.this, localValue);
-                        }
-                        catch ( Exception e )
-                        {
-                            ThreadUtils.checkInterrupted(e);
-                            log.error("From SharedValue listener", e);
-                        }
-                        return null;
-                    }
-                }
-            );
+        listeners.forEach(listener -> {
+            try
+            {
+                listener.valueHasChanged(SharedValue.this, localValue);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.error("From SharedValue listener", e);
+            }
+        });
     }
 
     private void notifyListenerOfStateChanged(final ConnectionState newState)
     {
-        listeners.forEach
-            (
-                new Function<SharedValueListener, Void>()
-                {
-                    @Override
-                    public Void apply(SharedValueListener listener)
-                    {
-                        listener.stateChanged(client, newState);
-                        return null;
-                    }
-                }
-            );
+        listeners.forEach(listener -> listener.stateChanged(client, newState));
     }
 }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
index e298cca..a846c54 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java
@@ -18,7 +18,7 @@
  */
 package org.apache.curator.framework.recipes.shared;
 
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
 
 /**
  * Abstracts a shared value and allows listening for changes to the value
@@ -44,5 +44,5 @@ public interface SharedValueReader
      *
      * @return listenable
      */
-    public ListenerContainer<SharedValueListener> getListenable();
+    public Listenable<SharedValueListener> getListenable();
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index ff416d5..705cad5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -27,7 +28,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.Callable;
@@ -196,7 +196,7 @@ public class TestNodeCache extends BaseClassForTests
                 }
             );
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Thread.sleep(timing.multiple(1.5).session());
 
             Assert.assertEquals(cache.getCurrentData().getData(), "start".getBytes());
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 78fabd5..3c65994 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -29,11 +29,11 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
@@ -814,7 +814,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
             Assert.assertTrue(timing.awaitLatch(childAddedLatch));
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.awaitLatch(lostLatch));
             Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
             Assert.assertTrue(timing.awaitLatch(removedLatch));
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 1e97ce2..7da3ecb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -23,8 +23,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -423,9 +423,9 @@ public class TestTreeCache extends BaseTestTreeCache
         client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
         assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/me");
 
-        Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
-        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes(), true);
+        client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
         assertEvent(TreeCacheEvent.Type.INITIALIZED, null, null, true);
+        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes(), true);
 
         assertNoMoreEvents();
     }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index b92c3a2..97a326e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -30,11 +30,11 @@ import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Arrays;
@@ -531,7 +531,7 @@ public class TestLeaderSelector extends BaseClassForTests
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             Assert.assertTrue(timing.awaitLatch(interruptedLatch));
             timing.sleepABit();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
deleted file mode 100644
index cafb9a3..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.locks;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-
-public class TestChildReaper extends BaseClassForTests
-{
-    @Test
-    public void testMaxChildren() throws Exception
-    {
-        server.close();
-
-        final int LARGE_QTY = 10000;
-
-        System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
-        server = new TestingServer();
-        try
-        {
-            Timing timing = new Timing();
-            ChildReaper reaper = null;
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
-            try
-            {
-                client.start();
-
-                for ( int i = 0; i < LARGE_QTY; ++i )
-                {
-                    if ( (i % 1000) == 0 )
-                    {
-                        System.out.println(i);
-                    }
-                    client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
-                }
-
-                try
-                {
-                    client.getChildren().forPath("/big");
-                    Assert.fail("Should have been a connection loss");
-                }
-                catch ( KeeperException.ConnectionLossException e )
-                {
-                    // expected
-                }
-
-                final CountDownLatch latch = new CountDownLatch(1);
-                reaper = new ChildReaper(client, "/big", Reaper.Mode.REAP_UNTIL_DELETE, 1)
-                {
-                    @Override
-                    protected void warnMaxChildren(String path, Stat stat)
-                    {
-                        latch.countDown();
-                        super.warnMaxChildren(path, stat);
-                    }
-                };
-                reaper.setMaxChildren(100);
-                reaper.start();
-                Assert.assertTrue(timing.awaitLatch(latch));
-            }
-            finally
-            {
-                CloseableUtils.closeQuietly(reaper);
-                CloseableUtils.closeQuietly(client);
-            }
-        }
-        finally
-        {
-            System.clearProperty("jute.maxbuffer");
-        }
-    }
-
-    @Test
-    public void testLargeNodes() throws Exception
-    {
-        server.close();
-
-        final int LARGE_QTY = 10000;
-        final int SMALL_QTY = 100;
-
-        System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
-        server = new TestingServer();
-        try
-        {
-            Timing timing = new Timing();
-            ChildReaper reaper = null;
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
-            try
-            {
-                client.start();
-
-                for ( int i = 0; i < LARGE_QTY; ++i )
-                {
-                    if ( (i % 1000) == 0 )
-                    {
-                        System.out.println(i);
-                    }
-                    client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
-
-                    if ( i < SMALL_QTY )
-                    {
-                        client.create().creatingParentsIfNeeded().forPath("/small/node-" + i);
-                    }
-                }
-
-                reaper = new ChildReaper(client, "/foo", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-                reaper.start();
-
-                reaper.addPath("/big");
-                reaper.addPath("/small");
-
-                int count = -1;
-                for ( int i = 0; (i < 10) && (count != 0); ++i )
-                {
-                    timing.sleepABit();
-                    count = client.checkExists().forPath("/small").getNumChildren();
-                }
-                Assert.assertEquals(count, 0);
-            }
-            finally
-            {
-                CloseableUtils.closeQuietly(reaper);
-                CloseableUtils.closeQuietly(client);
-            }
-        }
-        finally
-        {
-            System.clearProperty("jute.maxbuffer");
-        }
-    }
-
-    @Test
-    public void testSomeNodes() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            Random r = new Random();
-            int nonEmptyNodes = 0;
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-                if ( r.nextBoolean() )
-                {
-                    client.create().forPath("/test/" + Integer.toString(i) + "/foo");
-                    ++nonEmptyNodes;
-                }
-            }
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testSimple() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-            }
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testLeaderElection() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        LeaderLatch otherLeader = null;
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-            }
-
-            otherLeader = new LeaderLatch(client, "/test-leader");
-            otherLeader.start();
-            otherLeader.await();
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader");
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            //Should not have reaped anything at this point since otherLeader is still leader
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 10);
-
-            CloseableUtils.closeQuietly(otherLeader);
-
-            timing.forWaiting().sleepABit();
-
-            stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            if ( otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED )
-            {
-                CloseableUtils.closeQuietly(otherLeader);
-            }
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testMultiPath() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test1/" + Integer.toString(i));
-                client.create().creatingParentsIfNeeded().forPath("/test2/" + Integer.toString(i));
-                client.create().creatingParentsIfNeeded().forPath("/test3/" + Integer.toString(i));
-            }
-
-            reaper = new ChildReaper(client, "/test2", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-            reaper.addPath("/test1");
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test1");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-            stat = client.checkExists().forPath("/test2");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-            stat = client.checkExists().forPath("/test3");
-            Assert.assertEquals(stat.getNumChildren(), 10);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testNamespace() throws Exception
-    {
-        Timing timing = new Timing();
-        ChildReaper reaper = null;
-        CuratorFramework client = CuratorFrameworkFactory.builder()
-            .connectString(server.getConnectString())
-            .sessionTimeoutMs(timing.session())
-            .connectionTimeoutMs(timing.connection())
-            .retryPolicy(new RetryOneTime(1))
-            .namespace("foo")
-            .build();
-        try
-        {
-            client.start();
-
-            for ( int i = 0; i < 10; ++i )
-            {
-                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
-            }
-
-            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
-            reaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            Stat stat = client.checkExists().forPath("/test");
-            Assert.assertEquals(stat.getNumChildren(), 0);
-
-            stat = client.usingNamespace(null).checkExists().forPath("/foo/test");
-            Assert.assertNotNull(stat);
-            Assert.assertEquals(stat.getNumChildren(), 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index e9645e8..95b139d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -26,8 +26,8 @@ import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.schema.Schema;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -172,7 +172,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
             Assert.assertTrue(lock.isAcquiredInThisProcess());
 
             // Kill the session, check that lock node still exists
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertNotNull(client.checkExists().forPath(LOCK_PATH));
 
             // Release the lock and verify that the actual lock node created no longer exists
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 0c62650..afebf8a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -27,11 +27,11 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
@@ -200,7 +200,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
                 );
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Assert.assertTrue(timing.forSessionSleep().acquireSemaphore(semaphore, 1));
         }
         finally
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 50f6bce..ce9f8fa 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -664,53 +664,6 @@ public class TestInterProcessSemaphore extends BaseClassForTests
     }
 
     @Test
-    public void testChildReaperCleansUpLockNodes() throws Exception
-    {
-        Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-
-        ChildReaper childReaper = null;
-        try
-        {
-            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1);
-            semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
-
-            Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
-
-            childReaper = new ChildReaper(
-                client,
-                "/test",
-                Reaper.Mode.REAP_UNTIL_GONE,
-                ChildReaper.newExecutorService(),
-                1,
-                "/test-leader",
-                InterProcessSemaphoreV2.LOCK_SCHEMA
-            );
-            childReaper.start();
-
-            timing.forWaiting().sleepABit();
-
-            try
-            {
-                List<String> children = client.getChildren().forPath("/test");
-
-                Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped");
-            }
-            catch ( KeeperException.NoNodeException ok )
-            {
-                // this is OK - if Container Nodes are used the "/test" path will go away - no point in updating the test for deprecated code
-            }
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(childReaper);
-            CloseableUtils.closeQuietly(client);
-        }
-
-    }
-
-    @Test
     public void testNoOrphanedNodes() throws Exception
     {
         final Timing timing = new Timing();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
deleted file mode 100644
index c47808f..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ /dev/null
@@ -1,647 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.recipes.locks;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestReaper extends BaseClassForTests
-{
-    @Test
-    public void testUsingLeaderPath() throws Exception
-    {
-        final Timing timing = new Timing();
-        CuratorFramework client = makeClient(timing, null);
-        Reaper reaper1 = null;
-        Reaper reaper2 = null;
-        try
-        {
-            final AtomicInteger reaper1Count = new AtomicInteger();
-            reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper1Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            final AtomicInteger reaper2Count = new AtomicInteger();
-            reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper2Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            reaper1.start();
-            reaper2.start();
-
-            reaper1.addPath("/one/two/three");
-            reaper2.addPath("/one/two/three");
-
-            timing.sleepABit();
-
-            Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
-            Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
-
-            Reaper activeReaper;
-            AtomicInteger inActiveReaperCount;
-            if ( reaper1Count.get() > 0 )
-            {
-                activeReaper = reaper1;
-                inActiveReaperCount = reaper2Count;
-            }
-            else
-            {
-                activeReaper = reaper2;
-                inActiveReaperCount = reaper1Count;
-            }
-            Assert.assertEquals(inActiveReaperCount.get(), 0);
-            activeReaper.close();
-            timing.sleepABit();
-            Assert.assertTrue(inActiveReaperCount.get() > 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper1);
-            CloseableUtils.closeQuietly(reaper2);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testUsingLeaderLatch() throws Exception
-    {
-        final Timing timing = new Timing();
-        CuratorFramework client = makeClient(timing, null);
-        Reaper reaper1 = null;
-        Reaper reaper2 = null;
-        LeaderLatch leaderLatch1 = null;
-        LeaderLatch leaderLatch2 = null;
-        try
-        {
-            final AtomicInteger reaper1Count = new AtomicInteger();
-            leaderLatch1 = new LeaderLatch(client, "/reaper/leader");
-            reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch1)
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper1Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            final AtomicInteger reaper2Count = new AtomicInteger();
-            leaderLatch2 = new LeaderLatch(client, "/reaper/leader");
-            reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch2)
-            {
-                @Override
-                protected void reap(PathHolder holder)
-                {
-                    reaper2Count.incrementAndGet();
-                    super.reap(holder);
-                }
-            };
-
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            leaderLatch1.start();
-            leaderLatch2.start();
-
-            reaper1.start();
-            reaper2.start();
-
-            reaper1.addPath("/one/two/three");
-            reaper2.addPath("/one/two/three");
-
-            timing.sleepABit();
-
-            Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
-            Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
-
-            Reaper activeReaper;
-            LeaderLatch activeLeaderLeatch;
-            AtomicInteger inActiveReaperCount;
-            if ( reaper1Count.get() > 0 )
-            {
-                activeReaper = reaper1;
-                activeLeaderLeatch = leaderLatch1;
-                inActiveReaperCount = reaper2Count;
-            }
-            else
-            {
-                activeReaper = reaper2;
-                activeLeaderLeatch = leaderLatch2;
-                inActiveReaperCount = reaper1Count;
-            }
-            Assert.assertEquals(inActiveReaperCount.get(), 0);
-            activeReaper.close();
-            activeLeaderLeatch.close();
-            timing.sleepABit();
-            Assert.assertTrue(inActiveReaperCount.get() > 0);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper1);
-            CloseableUtils.closeQuietly(reaper2);
-            if (leaderLatch1 != null && LeaderLatch.State.STARTED == leaderLatch1.getState()) {
-                CloseableUtils.closeQuietly(leaderLatch1);
-            }
-            if (leaderLatch2 != null && LeaderLatch.State.STARTED == leaderLatch2.getState()) {
-                CloseableUtils.closeQuietly(leaderLatch2);
-            }
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testUsingManualLeader() throws Exception
-    {
-        final Timing timing = new Timing();
-        final CuratorFramework client = makeClient(timing, null);
-        final CountDownLatch latch = new CountDownLatch(1);
-        LeaderSelectorListener listener = new LeaderSelectorListener()
-        {
-            @Override
-            public void takeLeadership(CuratorFramework client) throws Exception
-            {
-                Reaper reaper = new Reaper(client, 1);
-                try
-                {
-                    reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
-                    reaper.start();
-
-                    timing.sleepABit();
-                    latch.countDown();
-                }
-                finally
-                {
-                    CloseableUtils.closeQuietly(reaper);
-                }
-            }
-
-            @Override
-            public void stateChanged(CuratorFramework client, ConnectionState newState)
-            {
-            }
-        };
-        LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            selector.start();
-            timing.awaitLatch(latch);
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(selector);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testSparseUseNoReap() throws Exception
-    {
-        final int THRESHOLD = 3000;
-
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, null);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
-            final ExecutorService pool = Executors.newCachedThreadPool();
-            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
-
-            reaper = new Reaper(client, service, THRESHOLD)
-            {
-                @Override
-                protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
-                {
-                    holders.add(pathHolder);
-                    final Future<?> f = super.schedule(pathHolder, reapingThresholdMs);
-                    pool.submit(new Callable<Void>()
-                        {
-                            @Override
-                            public Void call() throws Exception
-                            {
-                                f.get();
-                                holders.remove(pathHolder);
-                                return null;
-                            }
-                        }
-                               );
-                    return null;
-                }
-            };
-            reaper.start();
-            reaper.addPath("/one/two/three");
-
-            long start = System.currentTimeMillis();
-            boolean emptyCountIsCorrect = false;
-            while ( ((System.currentTimeMillis() - start) < timing.forWaiting().milliseconds()) && !emptyCountIsCorrect )   // need to loop as the Holder can go in/out of the Reaper's DelayQueue
-            {
-                for ( Reaper.PathHolder holder : holders )
-                {
-                    if ( holder.path.endsWith("/one/two/three") )
-                    {
-                        emptyCountIsCorrect = (holder.emptyCount > 0);
-                        break;
-                    }
-                }
-                Thread.sleep(1);
-            }
-            Assert.assertTrue(emptyCountIsCorrect);
-
-            client.create().forPath("/one/two/three/foo");
-
-            Thread.sleep(2 * (THRESHOLD / Reaper.EMPTY_COUNT_THRESHOLD));
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-            client.delete().forPath("/one/two/three/foo");
-
-            Thread.sleep(THRESHOLD);
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testReapUntilDelete() throws Exception
-    {
-        testReapUntilDelete(null);
-    }
-
-    @Test
-    public void testReapUntilDeleteNamespace() throws Exception
-    {
-        testReapUntilDelete("test");
-    }
-
-    @Test
-    public void testReapUntilGone() throws Exception
-    {
-        testReapUntilGone(null);
-    }
-
-    @Test
-    public void testReapUntilGoneNamespace() throws Exception
-    {
-        testReapUntilGone("test");
-    }
-
-    @Test
-    public void testRemove() throws Exception
-    {
-        testRemove(null);
-    }
-
-    @Test
-    public void testRemoveNamespace() throws Exception
-    {
-        testRemove("test");
-    }
-
-    @Test
-    public void testSimulationWithLocks() throws Exception
-    {
-        testSimulationWithLocks(null);
-    }
-
-    @Test
-    public void testSimulationWithLocksNamespace() throws Exception
-    {
-        testSimulationWithLocks("test");
-    }
-
-    @Test
-    public void testWithEphemerals() throws Exception
-    {
-        testWithEphemerals(null);
-    }
-
-    @Test
-    public void testWithEphemeralsNamespace() throws Exception
-    {
-        testWithEphemerals("test");
-    }
-
-    @Test
-    public void testBasic() throws Exception
-    {
-        testBasic(null);
-    }
-
-    @Test
-    public void testBasicNamespace() throws Exception
-    {
-        testBasic("test");
-    }
-
-    private void testReapUntilDelete(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-
-            client.create().forPath("/one/two/three");
-            timing.sleepABit();
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testReapUntilGone(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_GONE);
-            timing.sleepABit();
-
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_GONE);
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private CuratorFramework makeClient(Timing timing, String namespace) throws IOException
-    {
-        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).sessionTimeoutMs(timing.session()).connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1));
-        if ( namespace != null )
-        {
-            builder = builder.namespace(namespace);
-        }
-        return builder.build();
-    }
-
-    private void testRemove(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three");
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-
-            Assert.assertTrue(reaper.removePath("/one/two/three"));
-
-            client.create().forPath("/one/two/three");
-            timing.sleepABit();
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testSimulationWithLocks(String namespace) throws Exception
-    {
-        final int LOCK_CLIENTS = 10;
-        final int ITERATIONS = 250;
-        final int MAX_WAIT_MS = 10;
-
-        ExecutorService service = Executors.newFixedThreadPool(LOCK_CLIENTS);
-        ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
-
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        final CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-
-            reaper = new Reaper(client, MAX_WAIT_MS / 2);
-            reaper.start();
-            reaper.addPath("/a/b");
-
-            for ( int i = 0; i < LOCK_CLIENTS; ++i )
-            {
-                completionService.submit(new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
-                        {
-                            final InterProcessMutex lock = new InterProcessMutex(client, "/a/b");
-                            for ( int i = 0; i < ITERATIONS; ++i )
-                            {
-                                lock.acquire();
-                                try
-                                {
-                                    Thread.sleep((int)(Math.random() * MAX_WAIT_MS));
-                                }
-                                finally
-                                {
-                                    lock.release();
-                                }
-                            }
-                            return null;
-                        }
-                    }
-                                        );
-            }
-
-            for ( int i = 0; i < LOCK_CLIENTS; ++i )
-            {
-                completionService.take().get();
-            }
-
-            Thread.sleep(timing.session());
-            timing.sleepABit();
-
-            Stat stat = client.checkExists().forPath("/a/b");
-            Assert.assertNull(stat, "Child qty: " + ((stat != null) ? stat.getNumChildren() : 0));
-        }
-        finally
-        {
-            service.shutdownNow();
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testWithEphemerals(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client2 = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            client2 = makeClient(timing, namespace);
-            client2.start();
-            for ( int i = 0; i < 10; ++i )
-            {
-                client2.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/one/two/three/foo-");
-            }
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three");
-            timing.sleepABit();
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            client2.close();    // should clear ephemerals
-            client2 = null;
-
-            Thread.sleep(timing.session());
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client2);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    private void testBasic(String namespace) throws Exception
-    {
-        Timing timing = new Timing();
-        Reaper reaper = null;
-        CuratorFramework client = makeClient(timing, namespace);
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
-
-            Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
-
-            reaper = new Reaper(client, 100);
-            reaper.start();
-
-            reaper.addPath("/one/two/three");
-            timing.sleepABit();
-
-            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(reaper);
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 87585af..906282f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -31,9 +31,9 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
@@ -329,7 +329,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
             node.debugCreateNodeLatch = new CountDownLatch(1);
-            Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+            curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             // Make sure the node got deleted
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -359,7 +359,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
             node.debugCreateNodeLatch = new CountDownLatch(1);
-            Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+            curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             // Make sure the node got deleted...
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -400,7 +400,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
 
                 node.debugCreateNodeLatch = new CountDownLatch(1);
                 // Kill the session, thus cleaning up the node...
-                Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
+                curator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
                 // Make sure the node ended up getting deleted...
                 assertTrue(deletionTrigger.firedWithin(timing.multiple(1.5).forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -443,7 +443,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             Trigger deletedTrigger = Trigger.deletedOrSetData();
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
-            Compatibility.injectSessionExpiration(nodeCreator.getZookeeperClient().getZooKeeper());
+            nodeCreator.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 
             // Make sure the node got deleted...
             assertTrue(deletedTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
index 360c876..b95df11 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.ZKPaths;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -38,7 +37,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestPersistentTtlNode extends CuratorTestBase
 {
     private final Timing timing = new Timing();
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
index a3c2a29..830db1f 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
+++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
@@ -21,7 +21,6 @@ package org.apache.curator.test.compatibility;
 import org.apache.curator.test.BaseClassForTests;
 import org.testng.annotations.Listeners;
 
-@Listeners(Zk35MethodInterceptor.class)
 public class CuratorTestBase extends BaseClassForTests
 {
     protected final Timing2 timing = new Timing2();
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
deleted file mode 100644
index 8072b68..0000000
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.test.compatibility;
-
-import org.apache.curator.test.Compatibility;
-import org.testng.IMethodInstance;
-import org.testng.IMethodInterceptor;
-import org.testng.ITestContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class Zk35MethodInterceptor implements IMethodInterceptor
-{
-    public static final String zk35Group = "zk35";
-
-    @Override
-    public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context)
-    {
-        if ( !Compatibility.isZK34() )
-        {
-            return methods;
-        }
-
-        List<IMethodInstance> filteredMethods = new ArrayList<>();
-        for ( IMethodInstance method : methods )
-        {
-            if ( !isInGroup(method.getMethod().getGroups()) )
-            {
-                filteredMethods.add(method);
-            }
-        }
-        return filteredMethods;
-    }
-
-    private boolean isInGroup(String[] groups)
-    {
-        return (groups != null) && Arrays.asList(groups).contains(zk35Group);
-    }
-}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..5c015f4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -20,7 +20,7 @@ package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
@@ -28,10 +28,10 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
@@ -45,7 +45,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
     private final TreeCache cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
-    private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
+    private final StandardListenerManager<ModeledCacheListener<T>> listenerContainer = StandardListenerManager.standard();
     private final ZPath basePath;
 
     private static final class Entry<T>
@@ -144,10 +144,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         {
             ThreadUtils.checkInterrupted(e);
 
-            listenerContainer.forEach(l -> {
-                l.handleException(e);
-                return null;
-            });
+            listenerContainer.forEach(l -> l.handleException(e));
         }
     }
 
@@ -188,10 +185,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         case INITIALIZED:
         {
-            listenerContainer.forEach(l -> {
-                l.initialized();
-                return null;
-            });
+            listenerContainer.forEach(ModeledCacheListener::initialized);
             break;
         }
 
@@ -203,9 +197,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     private void accept(ModeledCacheListener.Type type, ZPath path, Stat stat, T model)
     {
-        listenerContainer.forEach(l -> {
-            l.accept(type, path, stat, model);
-            return null;
-        });
+        listenerContainer.forEach(l -> l.accept(type, path, stat, model));
     }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 05df301..c154836 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,14 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +46,15 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
+    private final StandardListenerManager<ServiceCacheListener> listenerContainer = StandardListenerManager.standard();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final PathChildrenCache cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -123,18 +122,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(l -> discovery.getClient().getConnectionStateListenable().removeListener(l));
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -166,39 +154,28 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
-            {
-                addInstance(event.getData(), false);
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            addInstance(event.getData(), false);
+            notifyListeners = true;
+            break;
+        }
 
-            case CHILD_REMOVED:
-            {
-                instances.remove(instanceIdFromData(event.getData()));
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_REMOVED:
+        {
+            instances.remove(instanceIdFromData(event.getData()));
+            notifyListeners = true;
+            break;
+        }
         }
 
         if ( notifyListeners )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
-                }
-            );
+            listenerContainer.forEach(ServiceCacheListener::cacheChanged);
         }
     }
 
@@ -209,8 +186,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
 
     private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+        String instanceId = instanceIdFromData(childData);
+        ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
         if ( onlyIfAbsent )
         {
             instances.putIfAbsent(instanceId, serviceInstance);
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 54719a5..7eecf3c 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -25,9 +25,9 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -79,7 +79,7 @@ public class TestServiceDiscovery extends BaseClassForTests
             timing.acquireSemaphore(semaphore, 2);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             server.stop();
 
             server.restart();
@@ -121,7 +121,7 @@ public class TestServiceDiscovery extends BaseClassForTests
             timing.acquireSemaphore(semaphore);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             server.stop();
 
             server.restart();
@@ -154,7 +154,7 @@ public class TestServiceDiscovery extends BaseClassForTests
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
 
-            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
             Thread.sleep(timing.multiple(1.5).session());
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
diff --git a/pom.xml b/pom.xml
index 43f2d561..b95a2bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,7 +317,6 @@
         <module>curator-x-discovery</module>
         <module>curator-x-discovery-server</module>
         <module>curator-x-async</module>
-        <module>curator-test-zk34</module>
     </modules>
 
     <dependencyManagement>
@@ -873,11 +872,6 @@
                                 <relocation>
                                     <pattern>com.google</pattern>
                                     <shadedPattern>org.apache.curator.shaded.com.google</shadedPattern>
-                                    <excludes>
-                                        <exclude>com.google.common.base.Function</exclude>
-                                        <exclude>com.google.common.base.Predicate</exclude>
-                                        <exclude>com.google.common.reflect.TypeToken</exclude>
-                                    </excludes>
                                 </relocation>
                             </relocations>
                             <artifactSet>
diff --git a/src/site/confluence/exhibitor.confluence b/src/site/confluence/exhibitor.confluence
deleted file mode 100644
index 156ff50..0000000
--- a/src/site/confluence/exhibitor.confluence
+++ /dev/null
@@ -1,24 +0,0 @@
-h1. Exhibitor Integration
-
-Curator can be integrated with Netflix [[Exhibitor|https://github.com/Netflix/exhibitor]] to achieve a live/updating list
-of the ZooKeeper ensemble. This means that your ZooKeeper client (i.e. Curator) will automatically/dynamically adjust
-to changes in the makeup of a ZooKeeper ensemble. Further, this support is generalized so that a service other than Exhibitor could be used if available.
-
-h2. Enabling
-
-The integration is enabled via the {{CuratorFrameworkFactory}} (or when constructing the {{CuratorZookeeperClient}} if not
-using the framework). Pass an {{EnsembleProvider}} to the {{ensembleProvider()}} method of the {{CuratorFrameworkFactory}} builder.
-
-h2. EnsembleProvider
-
-For Exhibitor, construct an instance of {{ExhibitorEnsembleProvider}} to pass to the builder. It takes a number of arguments:
-* exhibitors \- a list of the exhibitor instances. This is the initial set of Exhibitor instances. This set will get automatically updated if it changes.
-* restClient \- a simple facade to access the Exhibitor instances via REST. Use the provided {{DefaultExhibitorRestClient}} or something else of your choosing.
-* restUriPath \- the REST path to use. In most cases this should be: {{/exhibitor/v1/cluster/list}}
-* pollingMs \- how often to poll the Exhibitor instances
-* retryPolicy \- the retry policy to use when polling the Exhibitor instances.
-
-h2. Details
-
-Once configured, Curator will poll the Exhibitors for changes in the ensemble. If Curator should need to re\-create the ZooKeeper
-instance (due to a SysDisconnected event, etc.) it will use the updated ensemble list to do so.
diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility-34.confluence
similarity index 68%
rename from src/site/confluence/zk-compatibility.confluence
rename to src/site/confluence/zk-compatibility-34.confluence
index ed6e32e..911a642 100644
--- a/src/site/confluence/zk-compatibility.confluence
+++ b/src/site/confluence/zk-compatibility-34.confluence
@@ -1,18 +1,8 @@
-h1. ZooKeeper Version Compatibility
+h1. ZooKeeper Version 3.4.x Compatibility
 
-While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is
-used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator
-4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0
-both versions of ZooKeeper are supported via the same Curator libraries.
-
-h2. ZooKeeper 3.5.x
-
-* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x
-* If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0
-
-h2. ZooKeeper 3.4.x
-
-Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
+ZooKeeper 3.4.x is now at end-of-life. Consequently, the latest versions of Curator have removed support
+for it. If you wish to use Curator with ZooKeeper 3.4.x you should pin to version 4.2.x of Curator.
+Curator 4.2.x supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
 you must exclude ZooKeeper when adding Curator to your dependency management tool.
 
 _Maven_
@@ -21,7 +11,7 @@ _Maven_
 <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
-    <version>${curator-version}</version>
+    <version>4.2.0</version>
     <exclusions>
         <exclusion>
             <groupId>org.apache.zookeeper</groupId>
diff --git a/src/site/site.xml b/src/site/site.xml
index 3ff625f..bdfbce5 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -81,10 +81,6 @@
             <item name="Schema Support" href="curator-framework/schema.html"/>
         </menu>
 
-        <menu name="Compatibility" inherit="top">
-            <item name="ZooKeeper Versions" href="zk-compatibility.html"/>
-        </menu>
-
         <menu name="Low Level" inherit="top">
             <item name="Framework" href="curator-framework/index.html"/>
             <item name="Utilities" href="utilities.html"/>
@@ -101,7 +97,8 @@
             <item name="API Compatibility" href="compatibility.html"/>
             <item name="Javadoc" href="apidocs/index.html"/>
             <item name="Wiki" href="https://cwiki.apache.org/confluence/display/CURATOR"/>
-            <item name="Releases" href="https://cwiki.apache.org/confluence/display/CURATOR/Releases"/>
+            <item name="Releases/Compatibility" href="https://cwiki.apache.org/confluence/display/CURATOR/Releases"/>
+            <item name="ZooKeeper 3.4.x" href="zk-compatibility-34.html"/>
         </menu>
 
         <menu name="Extensions" inherit="top">