You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 18:25:40 UTC

[curator] 01/02: Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch zk36-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 931ec045b775a8c9c0614896a131aefb173b0eb9
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500

    Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x
---
 .../org/apache/curator/utils/Compatibility.java    |  35 ++++
 .../curator/utils/DefaultZookeeperFactory.java     |  17 ++
 .../apache/curator/framework/CuratorFramework.java |  16 ++
 .../curator/framework/api/AddWatchBuilder.java     |  18 +-
 .../curator/framework/api/AddWatchBuilder2.java    |  14 +-
 .../apache/curator/framework/api/AddWatchable.java |  28 ++-
 .../curator/framework/api/CuratorEventType.java    |   7 +-
 .../curator/framework/api/WatchesBuilder.java      |  20 ++-
 .../framework/imps/AddWatchBuilderImpl.java        | 197 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |  23 +++
 .../imps/CuratorMultiTransactionRecord.java        |  34 ++--
 .../framework/imps/ReconfigBuilderImpl.java        |   5 +-
 .../framework/imps/RemoveWatchesBuilderImpl.java   |   9 +-
 .../curator/framework/imps/WatchesBuilderImpl.java |  26 ++-
 .../apache/curator/framework/imps/Watching.java    |  10 +-
 .../framework/imps/TestCreateReturningStat.java    |   3 +-
 .../curator/framework/imps/TestFramework.java      |   9 +-
 .../curator/framework/imps/TestFrameworkEdges.java |   4 +-
 .../framework/imps/TestReconfiguration.java        |   3 +-
 .../curator/framework/imps/TestTtlNodes.java       |   3 +-
 .../framework/imps/TestWatcherRemovalManager.java  |   3 +-
 ...tRemoveWatches.java => TestWatchesBuilder.java} |  64 ++++++-
 .../curator/framework/imps/TestWithCluster.java    |   2 +
 .../state/TestConnectionStateManager.java          |   2 +
 .../framework/recipes/cache/BaseTestTreeCache.java |   4 +-
 .../framework/recipes/cache/TestNodeCache.java     |   2 +
 .../recipes/cache/TestPathChildrenCache.java       |   2 +
 .../cache/TestPathChildrenCacheInCluster.java      |   2 +
 .../framework/recipes/cache/TestTreeCache.java     |   2 +
 .../recipes/leader/ChaosMonkeyCnxnFactory.java     |   8 +-
 .../framework/recipes/leader/TestLeaderLatch.java  |   2 +
 .../locks/TestInterProcessSemaphoreCluster.java    |   2 +
 .../recipes/nodes/TestPersistentTtlNode.java       |   3 +-
 curator-test-zk34/pom.xml                          |   2 +-
 {curator-test-zk34 => curator-test-zk35}/pom.xml   | 112 +++++-------
 .../curator/framework/TestCompatibility.java       |  49 +++++
 .../src/test/resources/log4j.properties            |  27 +++
 curator-test/pom.xml                               |  10 ++
 .../org/apache/curator/test/BaseClassForTests.java |   4 +-
 .../org/apache/curator/test/Compatibility.java     |  93 ++++++++++
 .../org/apache/curator/test/TestingCluster.java    |   2 +-
 .../apache/curator/test/TestingQuorumPeerMain.java |   2 +-
 .../apache/curator/test/TestingZooKeeperMain.java  |   6 +-
 .../test/compatibility/CuratorTestBase.java        |   5 +-
 .../test/compatibility/Zk35MethodInterceptor.java  |  56 ------
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |   7 +
 .../curator/x/async/api/AsyncWatchBuilder.java     |  26 +--
 .../curator/x/async/api/AsyncWatchBuilder2.java    |  16 +-
 .../x/async/details/AsyncCuratorFrameworkImpl.java |  23 ++-
 .../x/async/details/AsyncWatchBuilderImpl.java     |  79 +++++++++
 .../curator/framework/imps/TestAddWatch.java       |  87 +++++++++
 pom.xml                                            |  45 +++--
 src/site/confluence/zk-compatibility.confluence    |  25 +--
 53 files changed, 991 insertions(+), 264 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
index 1ee2301..fd98d88 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
@@ -31,7 +31,9 @@ import java.lang.reflect.Method;
 public class Compatibility
 {
     private static final boolean hasZooKeeperAdmin;
+    private static final boolean hasPersistentWatchers;
     private static final Method queueEventMethod;
+
     private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
 
     static
@@ -49,6 +51,19 @@ public class Compatibility
         }
         hasZooKeeperAdmin = localHasZooKeeperAdmin;
 
+        boolean localHasPersistentWatchers;
+        try
+        {
+            Class.forName("org.apache.zookeeper.AddWatchMode");
+            localHasPersistentWatchers = true;
+        }
+        catch ( ClassNotFoundException e )
+        {
+            localHasPersistentWatchers = false;
+            logger.info("Persistent Watchers are not available in the version of the ZooKeeper library being used");
+        }
+        hasPersistentWatchers = localHasPersistentWatchers;
+
         Method localQueueEventMethod;
         try
         {
@@ -74,6 +89,26 @@ public class Compatibility
     }
 
     /**
+     * Return true if the ZooKeeperAdmin class is available
+     *
+     * @return true/false
+     */
+    public static boolean hasZooKeeperAdmin()
+    {
+        return hasZooKeeperAdmin;
+    }
+
+    /**
+     * Return true if persistent watchers are available
+     *
+     * @return true/false
+     */
+    public static boolean hasPersistentWatchers()
+    {
+        return hasPersistentWatchers;
+    }
+
+    /**
      * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>.
      * For ZooKeeper 3.4.x do the equivalent via reflection
      *
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
index 42279d0..f936518 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
@@ -20,12 +20,29 @@ package org.apache.curator.utils;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 
 public class DefaultZookeeperFactory implements ZookeeperFactory
 {
+    // hide org.apache.zookeeper.admin.ZooKeeperAdmin in a nested class so that Curator continues to work with ZK 3.4.x
+    private static class ZooKeeperAdminMaker implements ZookeeperFactory
+    {
+        static final ZooKeeperAdminMaker instance = new ZooKeeperAdminMaker();
+
+        @Override
+        public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
+        {
+            return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
+        }
+    }
+
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
     {
+        if ( Compatibility.hasZooKeeperAdmin() )
+        {
+            return ZooKeeperAdminMaker.instance.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        }
         return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
     }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..d279373 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -194,11 +194,20 @@ public interface CuratorFramework extends Closeable
 
     /**
      * Start a remove watches builder.
+     *
      * @return builder object
+     * @deprecated use {@link #watchers()} in ZooKeeper 3.6+
      */
     public RemoveWatchesBuilder watches();
 
     /**
+     * Start a watches builder.
+     *
+     * @return builder object
+     */
+    public WatchesBuilder watchers();
+
+    /**
      * Returns the listenable interface for the Connect State
      *
      * @return listenable
@@ -332,6 +341,13 @@ public interface CuratorFramework extends Closeable
     boolean isZk34CompatibilityMode();
 
     /**
+     * Return true if this instance is running in ZK 3.5.x compatibility mode
+     *
+     * @return true/false
+     */
+    boolean isZk35CompatibilityMode();
+
+    /**
      * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
      * done from the {@link #runSafe(Runnable)} thread.
      *
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
similarity index 68%
copy from curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
index a3c2a29..ad6d434 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
@@ -16,13 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test.compatibility;
+package org.apache.curator.framework.api;
 
-import org.apache.curator.test.BaseClassForTests;
-import org.testng.annotations.Listeners;
+import org.apache.zookeeper.AddWatchMode;
 
-@Listeners(Zk35MethodInterceptor.class)
-public class CuratorTestBase extends BaseClassForTests
+public interface AddWatchBuilder extends AddWatchBuilder2
 {
-    protected final Timing2 timing = new Timing2();
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode to use
+     * @return this
+     */
+    AddWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
similarity index 81%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
index 5b4b53f..9114c00 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.framework.api;
+
+public interface AddWatchBuilder2 extends
+    Backgroundable<AddWatchable<Pathable<Void>>>,
+    AddWatchable<Pathable<Void>>,
+    Pathable<Void>
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
similarity index 68%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
index 42279d0..1f0646c 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
@@ -16,16 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+
+package org.apache.curator.framework.api;
 
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchable<T>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..89e1490 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
     /**
      * Event sent when client is being closed
      */
-    CLOSING
+    CLOSING,
+
+    /**
+     * Corresponds to {@link org.apache.curator.framework.CuratorFramework#watchers()}
+     */
+    ADD_WATCH
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
similarity index 75%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
index 5b4b53f..3cd5528 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
@@ -16,12 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow watches to be removed 
+ */
+public interface WatchesBuilder extends RemoveWatchesBuilder
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+    /**
+     * Start an add watch operation
+     *
+     * @return builder
+     */
+    AddWatchBuilder add();
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
new file mode 100644
index 0000000..9f119a3
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.AddWatchBuilder2;
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching;
+    private Backgrounding backgrounding = new Backgrounding();
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AddWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+        watching = new Watching(client, true);
+    }
+
+    public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.mode = mode;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background");
+            if ( watching.isWatched() )
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+            else
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        watching.getWatcher(path),
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                if ( watching.isWatched() )
+                {
+                    client.getZooKeeper().addWatch(fixedPath, mode);
+                }
+                else
+                {
+                    client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
+                }
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..5afc5ce 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -42,6 +42,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateManager;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
@@ -571,6 +572,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new RemoveWatchesBuilderImpl(this);
     }
 
+    @Override
+    public WatchesBuilder watchers()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used. Use watches() instead.");
+        return new WatchesBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
@@ -620,6 +628,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return client.getZooKeeper();
     }
 
+    Object getZooKeeperAdmin() throws Exception
+    {
+        if ( isZk34CompatibilityMode() )
+        {
+            Preconditions.checkState(!isZk34CompatibilityMode(), "getZooKeeperAdmin() is not supported when running in ZooKeeper 3.4 compatibility mode");
+        }
+        return client.getZooKeeper();
+    }
+
     CompressionProvider getCompressionProvider()
     {
         return compressionProvider;
@@ -830,6 +847,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return zk34CompatibilityMode;
     }
 
+    @Override
+    public boolean isZk35CompatibilityMode()
+    {
+        return !zk34CompatibilityMode && !Compatibility.hasPersistentWatchers();
+    }
+
     EnsembleTracker getEnsembleTracker()
     {
         return ensembleTracker;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 3e72609..fbac6e6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -16,49 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.transaction.OperationType;
 import org.apache.curator.framework.api.transaction.TypeAndPath;
-import org.apache.zookeeper.MultiTransactionRecord;
 import org.apache.zookeeper.Op;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
-class CuratorMultiTransactionRecord extends MultiTransactionRecord
+class CuratorMultiTransactionRecord implements Iterable<Op>
 {
-    private final List<TypeAndPath>     metadata = Lists.newArrayList();
-
-    @Override
-    public final void add(Op op)
-    {
-        throw new UnsupportedOperationException();
-    }
+    private final List<TypeAndPath> metadata = Lists.newArrayList();
+    private List<Op> ops = new ArrayList<>();
 
     void add(Op op, OperationType type, String forPath)
     {
-        super.add(op);
+        ops.add(op);
         metadata.add(new TypeAndPath(type, forPath));
     }
 
-    TypeAndPath     getMetadata(int index)
+    TypeAndPath getMetadata(int index)
     {
         return metadata.get(index);
     }
 
-    int             metadataSize()
+    int metadataSize()
     {
         return metadata.size();
     }
 
     void addToDigest(MessageDigest digest)
     {
-        for ( Op op : this )
+        for ( Op op : ops )
         {
             digest.update(op.getPath().getBytes());
             digest.update(Integer.toString(op.getType()).getBytes());
             digest.update(op.toRequestRecord().toString().getBytes());
         }
     }
+
+    @Override
+    public Iterator<Op> iterator()
+    {
+        return ops.iterator();
+    }
+
+    int size()
+    {
+        return ops.size();
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 97be59a..9f129ca 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -24,6 +24,7 @@ import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import java.util.Arrays;
@@ -268,7 +269,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     client.processBackgroundOperation(data, event);
                 }
             };
-            client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
+            ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
         }
         catch ( Throwable e )
         {
@@ -287,7 +288,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     @Override
                     public byte[] call() throws Exception
                     {
-                        return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat);
+                        return ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
                     }
                 }
             );
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index e14deff..961d5f0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -201,8 +201,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
         }        
         
         return null;
-    }    
-    
+    }
+
+    protected CuratorFrameworkImpl getClient()
+    {
+        return client;
+    }
+
     private void pathInBackground(final String path)
     {
         OperationAndData.ErrorCallback<String>  errorCallback = null;
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
similarity index 50%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
index 42279d0..4a273c6 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
@@ -16,16 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
 
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.WatchesBuilder;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.WatcherType;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder
 {
+    public WatchesBuilderImpl(CuratorFrameworkImpl client)
+    {
+        super(client);
+    }
+
+    public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding)
+    {
+        super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding);
+    }
+
     @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
+    public AddWatchBuilder add()
     {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        return new AddWatchBuilderImpl(getClient());
     }
-}
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
index 034791d..5a52cf0 100755
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -34,7 +33,7 @@ import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestCreateReturningStat extends CuratorTestBase
 {
     private CuratorFramework createClient()
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..97c8a33 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -31,8 +31,8 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ZKPaths;
@@ -59,9 +59,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestFramework extends BaseClassForTests
 {
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -69,7 +70,7 @@ public class TestFramework extends BaseClassForTests
         super.setup();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
@@ -77,7 +78,7 @@ public class TestFramework extends BaseClassForTests
         super.teardown();
     }
 
-    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    @Test(groups = CuratorTestBase.zk35Group)
     public void testWaitForShutdownTimeoutMs() throws Exception
     {
         final BlockingQueue<Integer> timeoutQueue = new ArrayBlockingQueue<>(1);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 7c6d156..f89fff9 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -40,13 +40,13 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -63,9 +63,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestFrameworkEdges extends BaseClassForTests
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index c6ff2bb..74b52fa 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -32,7 +32,6 @@ import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingZooKeeperServer;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -57,7 +56,7 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = {CuratorTestBase.zk35Group, CuratorTestBase.zk35CompatibilityGroup})
 public class TestReconfiguration extends CuratorTestBase
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
index e2156df..6c5026a 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
@@ -25,7 +25,6 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -34,7 +33,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestTtlNodes extends CuratorTestBase
 {
     @BeforeClass
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
index 74aac1d..8968e3e 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -37,7 +36,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestWatcherRemovalManager extends CuratorTestBase
 {
     @Test
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
similarity index 88%
rename from curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
rename to curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
index 63d8931..26c41f1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
@@ -30,23 +30,25 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
-public class TestRemoveWatches extends CuratorTestBase
+@Test(groups = CuratorTestBase.zk35Group)
+public class TestWatchesBuilder extends CuratorTestBase
 {
     private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client)
     {
@@ -610,8 +612,58 @@ public class TestRemoveWatches extends CuratorTestBase
         {
             CloseableUtils.closeQuietly(client);
         }
-    }    
-    
+    }
+
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
     private static class CountDownWatcher implements Watcher {
         private String path;
         private EventType eventType;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
index 7e8ffbb..fbd6e2a 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -33,6 +34,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestWithCluster
 {
     @Test
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
index c929b41..63ccefe 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
@@ -30,6 +31,7 @@ import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestConnectionStateManager extends BaseClassForTests {
 
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index 175ccdf..246704f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -96,7 +96,7 @@ public class BaseTestTreeCache extends BaseClassForTests
     }
 
     @Override
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception
     {
         super.setup();
@@ -111,7 +111,7 @@ public class BaseTestTreeCache extends BaseClassForTests
     }
 
     @Override
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception
     {
         try
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index ff416d5..872eee2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -40,6 +41,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestNodeCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 78fabd5..ac02090 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
@@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.testng.AssertJUnit.assertNotNull;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestPathChildrenCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
index cd87125..3895625 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.Queues;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -34,6 +35,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestPathChildrenCacheInCluster extends BaseClassForTests
 {
     @Test(enabled = false)  // this test is very flakey - it needs to be re-written at some point
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 1e97ce2..4c7052d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
@@ -31,6 +32,7 @@ import org.testng.annotations.Test;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestTreeCache extends BaseTestTreeCache
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 4cb342c..07e9a17 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -19,11 +19,11 @@
 
 package org.apache.curator.framework.recipes.leader;
 
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.TestingZooKeeperMain;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.server.ByteBufferInputStream;
-import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -92,7 +92,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                 log.debug("Rejected : " + si.toString());
                 // Still reject request
                 log.debug("Still not ready for " + remaining + "ms");
-                ((NIOServerCnxn)si.cnxn).close();
+                Compatibility.serverCnxnClose(si.cnxn);
                 return;
             }
             // Submit the request to the legacy Zookeeper server
@@ -113,13 +113,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                         firstError = System.currentTimeMillis();
                         // The znode has been created, close the connection and don't tell it to client
                         log.warn("Closing connection right after " + createRequest.getPath() + " creation");
-                        ((NIOServerCnxn)si.cnxn).close();
+                        Compatibility.serverCnxnClose(si.cnxn);
                     }
                 }
                 catch ( Exception e )
                 {
                     // Should not happen
-                    ((NIOServerCnxn)si.cnxn).close();
+                    Compatibility.serverCnxnClose(si.cnxn);
                 }
             }
         }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 3d9e9b7..8f4e23f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -37,6 +37,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
@@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestLeaderLatch extends BaseClassForTests
 {
     private static final String PATH_NAME = "/one/two/me";
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index ed56f15..add547e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -30,6 +30,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestInterProcessSemaphoreCluster extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
index 360c876..049e1cd 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.ZKPaths;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -38,7 +37,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestPersistentTtlNode extends CuratorTestBase
 {
     private final Timing timing = new Timing();
diff --git a/curator-test-zk34/pom.xml b/curator-test-zk34/pom.xml
index 3d06cb0..606aba2 100644
--- a/curator-test-zk34/pom.xml
+++ b/curator-test-zk34/pom.xml
@@ -160,7 +160,7 @@
                         <dependency>org.apache.curator:curator-framework</dependency>
                         <dependency>org.apache.curator:curator-recipes</dependency>
                     </dependenciesToScan>
-                    <excludedGroups>zk35</excludedGroups>
+                    <excludedGroups>zk35,zk36</excludedGroups>
                 </configuration>
             </plugin>
 
diff --git a/curator-test-zk34/pom.xml b/curator-test-zk35/pom.xml
similarity index 72%
copy from curator-test-zk34/pom.xml
copy to curator-test-zk35/pom.xml
index 3d06cb0..5803893 100644
--- a/curator-test-zk34/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -1,27 +1,56 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>apache-curator</artifactId>
         <groupId>org.apache.curator</groupId>
+        <artifactId>apache-curator</artifactId>
         <version>4.2.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>curator-test-zk34</artifactId>
-
-    <name>Curator ZooKeeper 3.4 Testing</name>
-    <description>Tests for ZoKeeper 3.4 compatibility</description>
-    <inceptionYear>2017</inceptionYear>
+    <artifactId>curator-test-zk35</artifactId>
 
     <properties>
-        <zookeeper-34-version>3.4.8</zookeeper-34-version>
+        <zookeeper-35-version>3.5.6</zookeeper-35-version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
-            <version>${zookeeper-34-version}</version>
+            <version>${zookeeper-35-version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.sun.jmx</groupId>
@@ -49,7 +78,6 @@
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
-            <version>2.12.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
@@ -61,31 +89,6 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <exclusions>
                 <exclusion>
@@ -93,12 +96,13 @@
                     <artifactId>zookeeper</artifactId>
                 </exclusion>
             </exclusions>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
+            <artifactId>curator-framework</artifactId>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
@@ -120,18 +124,6 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-math</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -160,32 +152,10 @@
                         <dependency>org.apache.curator:curator-framework</dependency>
                         <dependency>org.apache.curator:curator-recipes</dependency>
                     </dependenciesToScan>
-                    <excludedGroups>zk35</excludedGroups>
+                    <groups>zk35,zk35Compatibility</groups>
+                    <excludedGroups>zk36</excludedGroups>
                 </configuration>
             </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-curator-test-classes</id>
-                        <phase>validate</phase>
-                        <goals>
-                            <goal>copy-resources</goal>
-                        </goals>
-                        <configuration>
-                            <outputDirectory>${basedir}/target/generated-test-sources/test-annotations/org/apache/curator/test/compatibility</outputDirectory>
-                            <resources>
-                                <resource>
-                                    <directory>../curator-test/src/main/java/org/apache/curator/test/compatibility</directory>
-                                    <filtering>false</filtering>
-                                </resource>
-                            </resources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 </project>
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
new file mode 100644
index 0000000..5112d41
--- /dev/null
+++ b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework;
+
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.testng.annotations.Test;
+
+public class TestCompatibility extends CuratorTestBase
+{
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailable() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.watchers().add().forPath("/foo");
+        }
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailableAsync()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().forPath("/foo");
+        }
+    }
+}
diff --git a/curator-test-zk35/src/test/resources/log4j.properties b/curator-test-zk35/src/test/resources/log4j.properties
new file mode 100644
index 0000000..2a85e0d
--- /dev/null
+++ b/curator-test-zk35/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=ERROR, console
+
+log4j.logger.org.apache.curator=DEBUG, console
+log4j.additivity.org.apache.curator=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 3683b7d..4f0c5a2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -41,6 +41,16 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 51af821..79c8b9a 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -113,7 +113,7 @@ public class BaseClassForTests
         context.getSuite().addListener(methodListener2);
     }
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception
     {
         if ( INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES != null )
@@ -142,7 +142,7 @@ public class BaseClassForTests
         }
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception
     {
         System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY);
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
index 5b4b53f..1f8d181 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
@@ -18,10 +18,103 @@
  */
 package org.apache.curator.test;
 
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import java.lang.reflect.Method;
+
+@SuppressWarnings("unchecked")
 public class Compatibility
 {
+    private static final Method closeAllWithReasonMethod;
+    private static final Method closeAllMethod;
+    private static final Method closeWithReasonMethod;
+    private static final Method closeMethod;
+    private static final Object disconnectReasonObj;
+
+    static
+    {
+        Object localDisconnectReasonObj;
+        Method localCloseAllWithReasonMethod;
+        Method localCloseAllMethod;
+        Method localCloseWithReasonMethod;
+        Method localCloseMethod;
+        try
+        {
+            Class disconnectReasonClass = Class.forName("org.apache.zookeeper.server.ServerCnxn$DisconnectReason");
+            localDisconnectReasonObj = Enum.valueOf(disconnectReasonClass, "UNKNOWN");
+            localCloseAllWithReasonMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll", disconnectReasonClass);
+            localCloseWithReasonMethod = ServerCnxn.class.getDeclaredMethod("close", disconnectReasonClass);
+            localCloseAllMethod = null;
+            localCloseMethod = null;
+
+            localCloseAllWithReasonMethod.setAccessible(true);
+            localCloseWithReasonMethod.setAccessible(true);
+        }
+        catch ( Throwable e )
+        {
+            localDisconnectReasonObj = null;
+            localCloseAllWithReasonMethod = null;
+            localCloseWithReasonMethod = null;
+            try
+            {
+                localCloseAllMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll");
+                localCloseMethod = ServerCnxn.class.getDeclaredMethod("close");
+
+                localCloseAllMethod.setAccessible(true);
+                localCloseMethod.setAccessible(true);
+            }
+            catch ( Throwable ex )
+            {
+                throw new IllegalStateException("Could not reflectively find ServerCnxnFactory/ServerCnxn close methods");
+            }
+        }
+        disconnectReasonObj = localDisconnectReasonObj;
+        closeAllWithReasonMethod = localCloseAllWithReasonMethod;
+        closeAllMethod = localCloseAllMethod;
+        closeMethod = localCloseMethod;
+        closeWithReasonMethod = localCloseWithReasonMethod;
+    }
+
     public static boolean isZK34()
     {
         return false;
     }
+
+    public static void serverCnxnFactoryCloseAll(ServerCnxnFactory factory)
+    {
+        try
+        {
+            if ( closeAllMethod != null )
+            {
+                closeAllMethod.invoke(factory);
+            }
+            else
+            {
+                closeAllWithReasonMethod.invoke(factory, disconnectReasonObj);
+            }
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException("Could not close factory", e);
+        }
+    }
+
+    public static void serverCnxnClose(ServerCnxn cnxn)
+    {
+        try
+        {
+            if ( closeMethod != null )
+            {
+                closeMethod.invoke(cnxn);
+            }
+            else
+            {
+                closeWithReasonMethod.invoke(cnxn, disconnectReasonObj);
+            }
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException("Could not close connection", e);
+        }
+    }
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
      */
     public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception
     {
-        Method              m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+        Method              m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
         m.setAccessible(true);
         InetSocketAddress   address = (InetSocketAddress)m.invoke(client);
         if ( address != null )
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
index 3b3ab26..7489527 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
@@ -39,7 +39,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
                 Field               cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory");
                 cnxnFactoryField.setAccessible(true);
                 ServerCnxnFactory   cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer);
-                cnxnFactory.closeAll();
+                Compatibility.serverCnxnFactoryCloseAll(cnxnFactory);
 
                 Field               ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index 841df77..91f185f 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@ -81,7 +81,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
         {
             if ( cnxnFactory != null )
             {
-                cnxnFactory.closeAll();
+                Compatibility.serverCnxnFactoryCloseAll(cnxnFactory);
 
                 Field ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
@@ -262,7 +262,9 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
     {
         public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
         {
-            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null);
+            this.setTxnLogFactory(txnLog);
+            this.setMinSessionTimeout(config.getMinSessionTimeout());
+            this.setMaxSessionTimeout(config.getMaxSessionTimeout());
         }
 
         private final AtomicBoolean isRunning = new AtomicBoolean(false);
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
index a3c2a29..c5d4018 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
+++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
@@ -21,8 +21,11 @@ package org.apache.curator.test.compatibility;
 import org.apache.curator.test.BaseClassForTests;
 import org.testng.annotations.Listeners;
 
-@Listeners(Zk35MethodInterceptor.class)
 public class CuratorTestBase extends BaseClassForTests
 {
+    public static final String zk35Group = "zk35";
+    public static final String zk36Group = "zk36";
+    public static final String zk35CompatibilityGroup = "zk35Compatibility";
+
     protected final Timing2 timing = new Timing2();
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
deleted file mode 100644
index 8072b68..0000000
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.test.compatibility;
-
-import org.apache.curator.test.Compatibility;
-import org.testng.IMethodInstance;
-import org.testng.IMethodInterceptor;
-import org.testng.ITestContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class Zk35MethodInterceptor implements IMethodInterceptor
-{
-    public static final String zk35Group = "zk35";
-
-    @Override
-    public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context)
-    {
-        if ( !Compatibility.isZK34() )
-        {
-            return methods;
-        }
-
-        List<IMethodInstance> filteredMethods = new ArrayList<>();
-        for ( IMethodInstance method : methods )
-        {
-            if ( !isInGroup(method.getMethod().getGroups()) )
-            {
-                filteredMethods.add(method);
-            }
-        }
-        return filteredMethods;
-    }
-
-    private boolean isInGroup(String[] groups)
-    {
-        return (groups != null) && Arrays.asList(groups).contains(zk35Group);
-    }
-}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..e54b148 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -112,4 +112,11 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
      * @return builder object
      */
     AsyncRemoveWatchesBuilder removeWatches();
+
+    /**
+     * Start an add watch builder
+     *
+     * @return builder object
+     */
+    AsyncWatchBuilder addWatch();
 }
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
similarity index 60%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
index 42279d0..a5e86ec 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
@@ -16,16 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+package org.apache.curator.x.async.api;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.AddWatchMode;
+
+public interface AsyncWatchBuilder extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode
+     * @return this
+     */
+    AsyncWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
similarity index 74%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
index 5b4b53f..a97cd4b 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.x.async.api;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+
+public interface AsyncWatchBuilder2 extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..a248c0e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.details;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -26,7 +27,10 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorMultiTransactionImpl;
 import org.apache.curator.framework.imps.GetACLBuilderImpl;
 import org.apache.curator.framework.imps.SyncBuilderImpl;
-import org.apache.curator.x.async.*;
+import org.apache.curator.utils.Compatibility;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.*;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
@@ -150,6 +154,13 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncWatchBuilder addWatch()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used.");
+        return new AsyncWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public CuratorFramework unwrap()
     {
         return client;
@@ -221,6 +232,16 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
         return new AsyncGetConfigBuilderImpl(client, filters, getBuilderWatchMode());
     }
 
+    Filters getFilters()
+    {
+        return filters;
+    }
+
+    CuratorFrameworkImpl getClient()
+    {
+        return client;
+    }
+
     private WatchMode getBuilderWatchMode()
     {
         return watched ? watchMode : null;
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
new file mode 100644
index 0000000..a10540e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.AddWatchBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.Watching;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncWatchBuilder;
+import org.apache.curator.x.async.api.AsyncWatchBuilder2;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AsyncWatchBuilder2, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+{
+    private final CuratorFrameworkImpl client;
+    private final Filters filters;
+    private Watching watching;
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+    {
+        this.client = client;
+        this.filters = filters;
+        watching = new Watching(client, true);
+    }
+
+    @Override
+    public AsyncWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncStage<Void> forPath(String path)
+    {
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+        AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode);
+        return safeCall(common.internalCallback, () -> builder.forPath(path));
+    }
+}
\ No newline at end of file
diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
new file mode 100644
index 0000000..bdf006a
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+public class TestAddWatch extends CuratorTestBase
+{
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test").toCompletableFuture().get();
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index 3a5e152..e223de6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
         <jdk-version>1.${short-jdk-version}</jdk-version>
 
         <!-- versions -->
-        <zookeeper-version>3.5.5</zookeeper-version>
+        <zookeeper-version>3.6.0-SNAPSHOT</zookeeper-version>
         <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
         <doxia-module-confluence-version>1.8</doxia-module-confluence-version>
@@ -85,25 +85,27 @@
         <guava-failureaccess-version>1.0.1</guava-failureaccess-version>
         <testng-version>6.14.3</testng-version>
         <swift-version>0.23.1</swift-version>
-        <dropwizard-version>1.3.7</dropwizard-version>
         <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version>
         <slf4j-version>1.7.25</slf4j-version>
         <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version>
+        <dropwizard-version>3.2.5</dropwizard-version>
+        <snappy-version>1.1.7</snappy-version>
 
         <!-- OSGi Properties -->
-        <osgi.export.package />
-        <osgi.import.package />
-        <osgi.private.package />
-        <osgi.dynamic.import />
-        <osgi.require.bundle />
-        <osgi.export.service />
-        <osgi.activator />
+        <osgi.export.package/>
+        <osgi.import.package/>
+        <osgi.private.package/>
+        <osgi.dynamic.import/>
+        <osgi.require.bundle/>
+        <osgi.export.service/>
+        <osgi.activator/>
     </properties>
 
     <scm>
         <url>https://github.com/apache/curator.git</url>
         <connection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</connection>
-        <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</developerConnection>
+        <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git
+        </developerConnection>
         <tag>apache-curator-3.2.0</tag>
     </scm>
 
@@ -318,6 +320,7 @@
         <module>curator-x-discovery-server</module>
         <module>curator-x-async</module>
         <module>curator-test-zk34</module>
+        <module>curator-test-zk35</module>
     </modules>
 
     <dependencyManagement>
@@ -567,6 +570,24 @@
                 <artifactId>dropwizard-logging</artifactId>
                 <version>${dropwizard-version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard-version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>${snappy-version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -582,7 +603,6 @@
                 <artifactId>maven-javadoc-plugin</artifactId>
                 <version>${maven-javadoc-plugin-version}</version>
                 <configuration>
-                    <aggregate>true</aggregate>
                     <additionalJOptions>
                         <additionalJOption>-J-Xmx1g</additionalJOption>
                     </additionalJOptions>
@@ -872,7 +892,8 @@
                             <relocations>
                                 <relocation>
                                     <pattern>com.google</pattern>
-                                    <shadedPattern>org.apache.curator.shaded.com.google</shadedPattern>
+                                    <shadedPattern>org.apache.curator.shaded.com.google
+                                    </shadedPattern>
                                     <excludes>
                                         <exclude>com.google.common.base.Function</exclude>
                                         <exclude>com.google.common.base.Predicate</exclude>
diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility.confluence
index ed6e32e..240734a 100644
--- a/src/site/confluence/zk-compatibility.confluence
+++ b/src/site/confluence/zk-compatibility.confluence
@@ -1,18 +1,19 @@
 h1. ZooKeeper Version Compatibility
 
-While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is
-used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator
-4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0
-both versions of ZooKeeper are supported via the same Curator libraries.
+There are multiple active version lines of ZooKeeper used in production by users. Curator can act
+as a client for any of these versions.
 
-h2. ZooKeeper 3.5.x
+h2. ZooKeeper 3.6.x
 
-* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x
-* If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0
+ZooKeeper 3.6.x is the newest version of ZooKeeper which adds new features such as
+Persistent/Recursive watchers.
 
-h2. ZooKeeper 3.4.x
+* Curator 4.3+ has a hard dependency on ZooKeeper 3.6.x
+* If you are using ZooKeeper 3.6.x there's nothing additional to do
 
-Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
+h2. ZooKeeper 3.5.x and ZooKeeper 3.4.x
+
+Curator 4.0 supports earlier versions of ZooKeeper in a soft\-compatibility mode. To use this mode
 you must exclude ZooKeeper when adding Curator to your dependency management tool.
 
 _Maven_
@@ -39,11 +40,11 @@ compile('org.apache.curator:curator-recipes:$curatorVersion') {
 }
 {code}
 
-You must add a dependency on ZooKeeper 3.4.x also.
+You must also add a dependency to the version of ZooKeeper you want to use.
 
 Curator will detect which ZooKeeper library is in use and automatically set ZooKeeper 3.4 compatibility
-mode as needed. In this mode, all features not supported by 3.4 are disabled. It is up to your
-application code to "do the right thing" and not use these features. Use the {{isZk34CompatibilityMode()}}
+or ZooKeeper 3.5 compatibility mode as needed. In this mode, all features not supported are disabled/disallowed. It is up to your
+application code to "do the right thing" and not use these features. Use the {{isZk34CompatibilityMode()}} or {{isZk35CompatibilityMode()}}
 method to determine which mode Curator is using at runtime.
 
 h2. Testing With ZooKeeper 3.4.x