You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/02 23:40:04 UTC

[curator] 01/01: wip - new module that will be used to support features in ZK 3.6+ while maintaining background compatability with previous versions of ZK in the other modules. At some point in the future, this can be merged into the main modules and then removed

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch curator-v2
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 98efd1c34650e478306ce43ad45d9375aae7484e
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500

    wip - new module that will be used to support features in ZK 3.6+ while maintaining background compatability with previous versions of ZK in the other modules. At some point in the future, this can be merged into the main modules and then removed
---
 .../org/apache/curator/utils/Compatibility.java    |  11 +
 .../curator/utils/DefaultZookeeperFactory.java     |  17 ++
 .../curator/framework/api/CuratorEventType.java    |   7 +-
 .../framework/imps/CuratorFrameworkImpl.java       |   9 +
 .../framework/imps/ReconfigBuilderImpl.java        |   5 +-
 .../framework/imps/RemoveWatchesBuilderImpl.java   |   9 +-
 .../apache/curator/framework/imps/Watching.java    |  10 +-
 .../curator/framework/imps/TestFramework.java      |   5 +-
 .../curator/framework/imps/TestFrameworkEdges.java |   2 +
 .../framework/imps/TestReconfiguration.java        |   2 +-
 .../curator/framework/imps/TestWithCluster.java    |   2 +
 .../state/TestConnectionStateManager.java          |   2 +
 .../framework/recipes/cache/BaseTestTreeCache.java |   4 +-
 .../framework/recipes/cache/TestNodeCache.java     |   2 +
 .../recipes/cache/TestPathChildrenCache.java       |   2 +
 .../cache/TestPathChildrenCacheInCluster.java      |   2 +
 .../framework/recipes/cache/TestTreeCache.java     |   2 +
 .../recipes/leader/ChaosMonkeyCnxnFactory.java     |   8 +-
 .../framework/recipes/leader/TestLeaderLatch.java  |   2 +
 .../locks/TestInterProcessSemaphoreCluster.java    |   2 +
 curator-test/pom.xml                               |  10 +
 .../org/apache/curator/test/BaseClassForTests.java |   4 +-
 .../org/apache/curator/test/Compatibility.java     |  93 +++++++
 .../org/apache/curator/test/TestingCluster.java    |   2 +-
 .../apache/curator/test/TestingQuorumPeerMain.java |   2 +-
 .../apache/curator/test/TestingZooKeeperMain.java  |   6 +-
 .../test/compatibility/Zk35MethodInterceptor.java  |   1 +
 curator-v2/pom.xml                                 | 146 +++++++++++
 .../curator/framework/api/AddWatchBuilder.java     |  19 +-
 .../curator/framework/api/AddWatchBuilder2.java    |  13 +-
 .../apache/curator/framework/api/AddWatchable.java |  28 +-
 .../curator/framework/api/WatchesBuilder.java      |  20 +-
 .../framework/imps/AddWatchBuilderImpl.java        | 172 ++++++++++++
 .../framework/imps/CuratorFrameworkV2Impl.java     | 290 +++++++++++++++++++++
 .../imps/WatcherRemoveCuratorFrameworkV2Impl.java  | 284 ++++++++++++++++++++
 .../curator/framework/imps/WatchesBuilderImpl.java |  26 +-
 .../apache/curator/v2/AsyncCuratorFrameworkV2.java |  69 +++++
 .../org/apache/curator/v2/CuratorFrameworkV2.java  |  33 ++-
 .../v2/WatcherRemoveCuratorFrameworkV2.java        |  10 +-
 .../x/async/api/AsyncCuratorFrameworkDslV2.java    |  14 +-
 .../curator/x/async/api/AsyncWatchBuilder.java     |  24 +-
 .../async/details/AsyncCuratorFrameworkV2Impl.java | 189 ++++++++++++++
 .../x/async/details/AsyncWatchBuilderImpl.java     |  77 ++++++
 .../framework/v2/TestAsyncCuratorFrameworkV2.java  |  57 ++++
 .../curator/framework/v2/TestFrameworkV2.java      |  57 ++++
 curator-v2/src/test/resources/log4j.properties     |  27 ++
 .../x/async/details/AsyncCuratorFrameworkImpl.java |  10 +
 pom.xml                                            |  45 +++-
 src/site/confluence/zk-compatibility.confluence    |  40 ++-
 49 files changed, 1766 insertions(+), 107 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
index 1ee2301..f9810fe 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
@@ -32,6 +32,7 @@ public class Compatibility
 {
     private static final boolean hasZooKeeperAdmin;
     private static final Method queueEventMethod;
+
     private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
 
     static
@@ -74,6 +75,16 @@ public class Compatibility
     }
 
     /**
+     * Return true if the ZooKeeperAdmin class is available
+     *
+     * @return true/false
+     */
+    public static boolean hasZooKeeperAdmin()
+    {
+        return hasZooKeeperAdmin;
+    }
+
+    /**
      * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>.
      * For ZooKeeper 3.4.x do the equivalent via reflection
      *
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
index 42279d0..f936518 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
@@ -20,12 +20,29 @@ package org.apache.curator.utils;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 
 public class DefaultZookeeperFactory implements ZookeeperFactory
 {
+    // hide org.apache.zookeeper.admin.ZooKeeperAdmin in a nested class so that Curator continues to work with ZK 3.4.x
+    private static class ZooKeeperAdminMaker implements ZookeeperFactory
+    {
+        static final ZooKeeperAdminMaker instance = new ZooKeeperAdminMaker();
+
+        @Override
+        public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
+        {
+            return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
+        }
+    }
+
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
     {
+        if ( Compatibility.hasZooKeeperAdmin() )
+        {
+            return ZooKeeperAdminMaker.instance.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        }
         return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
     }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..d19f49b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
     /**
      * Event sent when client is being closed
      */
-    CLOSING
+    CLOSING,
+
+    /**
+     * Corresponds to {@code CuratorFrameworkV2.watches().add()}
+     */
+    ADD_WATCH
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..bc59e77 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -620,6 +620,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return client.getZooKeeper();
     }
 
+    Object getZooKeeperAdmin() throws Exception
+    {
+        if ( isZk34CompatibilityMode() )
+        {
+            Preconditions.checkState(!isZk34CompatibilityMode(), "getZooKeeperAdmin() is not supported when running in ZooKeeper 3.4 compatibility mode");
+        }
+        return client.getZooKeeper();
+    }
+
     CompressionProvider getCompressionProvider()
     {
         return compressionProvider;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 97be59a..9f129ca 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -24,6 +24,7 @@ import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import java.util.Arrays;
@@ -268,7 +269,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     client.processBackgroundOperation(data, event);
                 }
             };
-            client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
+            ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
         }
         catch ( Throwable e )
         {
@@ -287,7 +288,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     @Override
                     public byte[] call() throws Exception
                     {
-                        return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat);
+                        return ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
                     }
                 }
             );
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index e14deff..961d5f0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -201,8 +201,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
         }        
         
         return null;
-    }    
-    
+    }
+
+    protected CuratorFrameworkImpl getClient()
+    {
+        return client;
+    }
+
     private void pathInBackground(final String path)
     {
         OperationAndData.ErrorCallback<String>  errorCallback = null;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..0431829 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -59,9 +59,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestFramework extends BaseClassForTests
 {
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -69,7 +70,7 @@ public class TestFramework extends BaseClassForTests
         super.setup();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 7c6d156..7af039b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -41,6 +41,7 @@ import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
@@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestFrameworkEdges extends BaseClassForTests
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index c6ff2bb..a6399de 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -57,7 +57,7 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = {Zk35MethodInterceptor.zk35Group, Zk35MethodInterceptor.curatorV2Group})
 public class TestReconfiguration extends CuratorTestBase
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
index 7e8ffbb..079c186 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -33,6 +34,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestWithCluster
 {
     @Test
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
index c929b41..bb0d569 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
@@ -24,12 +24,14 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestConnectionStateManager extends BaseClassForTests {
 
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index 175ccdf..246704f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -96,7 +96,7 @@ public class BaseTestTreeCache extends BaseClassForTests
     }
 
     @Override
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception
     {
         super.setup();
@@ -111,7 +111,7 @@ public class BaseTestTreeCache extends BaseClassForTests
     }
 
     @Override
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception
     {
         try
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index ff416d5..f9b8cce 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.cache;
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -40,6 +41,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestNodeCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 78fabd5..7ec86b9 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
@@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.testng.AssertJUnit.assertNotNull;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestPathChildrenCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
index cd87125..083d158 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.Queues;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -34,6 +35,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestPathChildrenCacheInCluster extends BaseClassForTests
 {
     @Test(enabled = false)  // this test is very flakey - it needs to be re-written at some point
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 1e97ce2..3e8a3a5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
@@ -31,6 +32,7 @@ import org.testng.annotations.Test;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestTreeCache extends BaseTestTreeCache
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 4cb342c..07e9a17 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -19,11 +19,11 @@
 
 package org.apache.curator.framework.recipes.leader;
 
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.TestingZooKeeperMain;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.server.ByteBufferInputStream;
-import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -92,7 +92,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                 log.debug("Rejected : " + si.toString());
                 // Still reject request
                 log.debug("Still not ready for " + remaining + "ms");
-                ((NIOServerCnxn)si.cnxn).close();
+                Compatibility.serverCnxnClose(si.cnxn);
                 return;
             }
             // Submit the request to the legacy Zookeeper server
@@ -113,13 +113,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                         firstError = System.currentTimeMillis();
                         // The znode has been created, close the connection and don't tell it to client
                         log.warn("Closing connection right after " + createRequest.getPath() + " creation");
-                        ((NIOServerCnxn)si.cnxn).close();
+                        Compatibility.serverCnxnClose(si.cnxn);
                     }
                 }
                 catch ( Exception e )
                 {
                     // Should not happen
-                    ((NIOServerCnxn)si.cnxn).close();
+                    Compatibility.serverCnxnClose(si.cnxn);
                 }
             }
         }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 3d9e9b7..ddb7f53 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -38,6 +38,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestLeaderLatch extends BaseClassForTests
 {
     private static final String PATH_NAME = "/one/two/me";
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index ed56f15..72ecf59 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -30,6 +30,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = Zk35MethodInterceptor.curatorV2Group)
 public class TestInterProcessSemaphoreCluster extends BaseClassForTests
 {
     @Test
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 3683b7d..4f0c5a2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -41,6 +41,16 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 51af821..79c8b9a 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -113,7 +113,7 @@ public class BaseClassForTests
         context.getSuite().addListener(methodListener2);
     }
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception
     {
         if ( INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES != null )
@@ -142,7 +142,7 @@ public class BaseClassForTests
         }
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception
     {
         System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY);
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
index 5b4b53f..1f8d181 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
@@ -18,10 +18,103 @@
  */
 package org.apache.curator.test;
 
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import java.lang.reflect.Method;
+
+@SuppressWarnings("unchecked")
 public class Compatibility
 {
+    private static final Method closeAllWithReasonMethod;
+    private static final Method closeAllMethod;
+    private static final Method closeWithReasonMethod;
+    private static final Method closeMethod;
+    private static final Object disconnectReasonObj;
+
+    static
+    {
+        Object localDisconnectReasonObj;
+        Method localCloseAllWithReasonMethod;
+        Method localCloseAllMethod;
+        Method localCloseWithReasonMethod;
+        Method localCloseMethod;
+        try
+        {
+            Class disconnectReasonClass = Class.forName("org.apache.zookeeper.server.ServerCnxn$DisconnectReason");
+            localDisconnectReasonObj = Enum.valueOf(disconnectReasonClass, "UNKNOWN");
+            localCloseAllWithReasonMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll", disconnectReasonClass);
+            localCloseWithReasonMethod = ServerCnxn.class.getDeclaredMethod("close", disconnectReasonClass);
+            localCloseAllMethod = null;
+            localCloseMethod = null;
+
+            localCloseAllWithReasonMethod.setAccessible(true);
+            localCloseWithReasonMethod.setAccessible(true);
+        }
+        catch ( Throwable e )
+        {
+            localDisconnectReasonObj = null;
+            localCloseAllWithReasonMethod = null;
+            localCloseWithReasonMethod = null;
+            try
+            {
+                localCloseAllMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll");
+                localCloseMethod = ServerCnxn.class.getDeclaredMethod("close");
+
+                localCloseAllMethod.setAccessible(true);
+                localCloseMethod.setAccessible(true);
+            }
+            catch ( Throwable ex )
+            {
+                throw new IllegalStateException("Could not reflectively find ServerCnxnFactory/ServerCnxn close methods");
+            }
+        }
+        disconnectReasonObj = localDisconnectReasonObj;
+        closeAllWithReasonMethod = localCloseAllWithReasonMethod;
+        closeAllMethod = localCloseAllMethod;
+        closeMethod = localCloseMethod;
+        closeWithReasonMethod = localCloseWithReasonMethod;
+    }
+
     public static boolean isZK34()
     {
         return false;
     }
+
+    public static void serverCnxnFactoryCloseAll(ServerCnxnFactory factory)
+    {
+        try
+        {
+            if ( closeAllMethod != null )
+            {
+                closeAllMethod.invoke(factory);
+            }
+            else
+            {
+                closeAllWithReasonMethod.invoke(factory, disconnectReasonObj);
+            }
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException("Could not close factory", e);
+        }
+    }
+
+    public static void serverCnxnClose(ServerCnxn cnxn)
+    {
+        try
+        {
+            if ( closeMethod != null )
+            {
+                closeMethod.invoke(cnxn);
+            }
+            else
+            {
+                closeWithReasonMethod.invoke(cnxn, disconnectReasonObj);
+            }
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException("Could not close connection", e);
+        }
+    }
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
      */
     public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception
     {
-        Method              m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+        Method              m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
         m.setAccessible(true);
         InetSocketAddress   address = (InetSocketAddress)m.invoke(client);
         if ( address != null )
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
index 3b3ab26..7489527 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
@@ -39,7 +39,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
                 Field               cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory");
                 cnxnFactoryField.setAccessible(true);
                 ServerCnxnFactory   cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer);
-                cnxnFactory.closeAll();
+                Compatibility.serverCnxnFactoryCloseAll(cnxnFactory);
 
                 Field               ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index 841df77..91f185f 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@ -81,7 +81,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
         {
             if ( cnxnFactory != null )
             {
-                cnxnFactory.closeAll();
+                Compatibility.serverCnxnFactoryCloseAll(cnxnFactory);
 
                 Field ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
@@ -262,7 +262,9 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
     {
         public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
         {
-            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null);
+            this.setTxnLogFactory(txnLog);
+            this.setMinSessionTimeout(config.getMinSessionTimeout());
+            this.setMaxSessionTimeout(config.getMaxSessionTimeout());
         }
 
         private final AtomicBoolean isRunning = new AtomicBoolean(false);
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
index 8072b68..10fafde 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
+++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
@@ -29,6 +29,7 @@ import java.util.List;
 public class Zk35MethodInterceptor implements IMethodInterceptor
 {
     public static final String zk35Group = "zk35";
+    public static final String curatorV2Group = "curatorV2";
 
     @Override
     public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context)
diff --git a/curator-v2/pom.xml b/curator-v2/pom.xml
new file mode 100644
index 0000000..0ff537a
--- /dev/null
+++ b/curator-v2/pom.xml
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>apache-curator</artifactId>
+        <version>4.2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>curator-v2</artifactId>
+
+    <properties>
+        <zookeeper-new-version>3.6.0-SNAPSHOT</zookeeper-new-version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>${zookeeper-new-version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <dependenciesToScan>
+                        <dependency>org.apache.curator:curator-framework</dependency>
+                        <dependency>org.apache.curator:curator-recipes</dependency>
+                    </dependenciesToScan>
+                    <groups>curatorV2</groups>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
similarity index 68%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
index 5b4b53f..ad6d434 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
@@ -16,12 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
+package org.apache.curator.framework.api;
 
-public class Compatibility
+import org.apache.zookeeper.AddWatchMode;
+
+public interface AddWatchBuilder extends AddWatchBuilder2
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode to use
+     * @return this
+     */
+    AddWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
similarity index 82%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
index 5b4b53f..d002c23 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.framework.api;
+
+public interface AddWatchBuilder2 extends
+    Backgroundable<AddWatchable<Pathable<Void>>>,
+    AddWatchable<Pathable<Void>>
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchable.java
similarity index 68%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchable.java
index 42279d0..1f0646c 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchable.java
@@ -16,16 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+
+package org.apache.curator.framework.api;
 
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchable<T>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
similarity index 75%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-v2/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
index 5b4b53f..3cd5528 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-v2/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
@@ -16,12 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow watches to be removed 
+ */
+public interface WatchesBuilder extends RemoveWatchesBuilder
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+    /**
+     * Start an add watch operation
+     *
+     * @return builder
+     */
+    AddWatchBuilder add();
+}
\ No newline at end of file
diff --git a/curator-v2/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
new file mode 100644
index 0000000..3be559b
--- /dev/null
+++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.AddWatchBuilder2;
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching = null;
+    private Backgrounding backgrounding = new Backgrounding();
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AddWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.mode = mode;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background");
+            client.getZooKeeper().addWatch
+                (
+                    fixedPath,
+                    watching.getWatcher(path),
+                    mode,
+                    (rc, path1, ctx) -> {
+                        trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                        CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                        client.processBackgroundOperation(data, event);
+                    },
+                    backgrounding.getContext()
+                );
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-v2/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkV2Impl.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkV2Impl.java
new file mode 100644
index 0000000..338ca6f
--- /dev/null
+++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkV2Impl.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.schema.SchemaSet;
+import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.v2.CuratorFrameworkV2;
+import org.apache.curator.v2.WatcherRemoveCuratorFrameworkV2;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class CuratorFrameworkV2Impl implements CuratorFrameworkV2
+{
+    private final CuratorFrameworkImpl client;
+
+    public CuratorFrameworkV2Impl(CuratorFramework client)
+    {
+        this.client = reveal(client);
+    }
+
+    private static CuratorFrameworkImpl reveal(CuratorFramework client)
+    {
+        try
+        {
+            return (CuratorFrameworkImpl)Objects.requireNonNull(client, "client cannot be null");
+        }
+        catch ( Exception e )
+        {
+            throw new IllegalArgumentException("Only Curator clients created through CuratorFrameworkFactory are supported: " + client.getClass().getName());
+        }
+    }
+
+    @Override
+    public void start()
+    {
+        client.start();
+    }
+
+    @Override
+    public void close()
+    {
+        client.close();
+    }
+
+    @Override
+    public CuratorFrameworkState getState()
+    {
+        return client.getState();
+    }
+
+    @Override
+    public boolean isStarted()
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public CreateBuilder create()
+    {
+        return client.create();
+    }
+
+    @Override
+    public DeleteBuilder delete()
+    {
+        return client.delete();
+    }
+
+    @Override
+    public ExistsBuilder checkExists()
+    {
+        return client.checkExists();
+    }
+
+    @Override
+    public GetDataBuilder getData()
+    {
+        return client.getData();
+    }
+
+    @Override
+    public SetDataBuilder setData()
+    {
+        return client.setData();
+    }
+
+    @Override
+    public GetChildrenBuilder getChildren()
+    {
+        return client.getChildren();
+    }
+
+    @Override
+    public GetACLBuilder getACL()
+    {
+        return client.getACL();
+    }
+
+    @Override
+    public SetACLBuilder setACL()
+    {
+        return client.setACL();
+    }
+
+    @Override
+    public ReconfigBuilder reconfig()
+    {
+        return client.reconfig();
+    }
+
+    @Override
+    public GetConfigBuilder getConfig()
+    {
+        return client.getConfig();
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public CuratorTransaction inTransaction()
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public CuratorMultiTransaction transaction()
+    {
+        return client.transaction();
+    }
+
+    @Override
+    public TransactionOp transactionOp()
+    {
+        return client.transactionOp();
+    }
+
+    @Override
+    public void sync(String path, Object backgroundContextObject)
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public void createContainers(String path) throws Exception
+    {
+        client.createContainers(path);
+    }
+
+    @Override
+    public SyncBuilder sync()
+    {
+        return client.sync();
+    }
+
+    @Override
+    public WatchesBuilder watches()
+    {
+        return new WatchesBuilderImpl(client);
+    }
+
+    @Override
+    public Listenable<ConnectionStateListener> getConnectionStateListenable()
+    {
+        return client.getConnectionStateListenable();
+    }
+
+    @Override
+    public Listenable<CuratorListener> getCuratorListenable()
+    {
+        return client.getCuratorListenable();
+    }
+
+    @Override
+    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
+    {
+        return client.getUnhandledErrorListenable();
+    }
+
+    @Override
+    public CuratorFramework nonNamespaceView()
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public CuratorFrameworkV2 usingNamespace(String newNamespace)
+    {
+        return CuratorFrameworkV2.wrap(client.usingNamespace(newNamespace));
+    }
+
+    @Override
+    public String getNamespace()
+    {
+        return client.getNamespace();
+    }
+
+    @Override
+    public CuratorZookeeperClient getZookeeperClient()
+    {
+        return client.getZookeeperClient();
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public EnsurePath newNamespaceAwareEnsurePath(String path)
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public void clearWatcherReferences(Watcher watcher)
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+    {
+        return client.blockUntilConnected(maxWaitTime, units);
+    }
+
+    @Override
+    public void blockUntilConnected() throws InterruptedException
+    {
+        client.blockUntilConnected();
+    }
+
+    @Override
+    public WatcherRemoveCuratorFrameworkV2 newWatcherRemoveCuratorFramework()
+    {
+        return new WatcherRemoveCuratorFrameworkV2Impl(client, client.newWatcherRemoveCuratorFramework());
+    }
+
+    @Override
+    public ConnectionStateErrorPolicy getConnectionStateErrorPolicy()
+    {
+        return client.getConnectionStateErrorPolicy();
+    }
+
+    @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return client.getCurrentConfig();
+    }
+
+    @Override
+    public SchemaSet getSchemaSet()
+    {
+        return client.getSchemaSet();
+    }
+
+    @Override
+    public boolean isZk34CompatibilityMode()
+    {
+        return client.isZk34CompatibilityMode();
+    }
+
+    @Override
+    public CompletableFuture<Void> runSafe(Runnable runnable)
+    {
+        return client.runSafe(runnable);
+    }
+}
diff --git a/curator-v2/src/main/java/org/apache/curator/framework/imps/WatcherRemoveCuratorFrameworkV2Impl.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatcherRemoveCuratorFrameworkV2Impl.java
new file mode 100644
index 0000000..67daf3e
--- /dev/null
+++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatcherRemoveCuratorFrameworkV2Impl.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.schema.SchemaSet;
+import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.v2.CuratorFrameworkV2;
+import org.apache.curator.v2.WatcherRemoveCuratorFrameworkV2;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+class WatcherRemoveCuratorFrameworkV2Impl implements WatcherRemoveCuratorFrameworkV2
+{
+    private final CuratorFrameworkImpl client;
+    private final WatcherRemoveCuratorFramework facade;
+
+    WatcherRemoveCuratorFrameworkV2Impl(CuratorFrameworkImpl client, WatcherRemoveCuratorFramework facade)
+    {
+        this.client = client;
+        this.facade = facade;
+    }
+
+    @Override
+    public void removeWatchers()
+    {
+        facade.removeWatchers();
+    }
+
+    @Override
+    public void start()
+    {
+        facade.start();
+    }
+
+    @Override
+    public void close()
+    {
+        facade.close();
+    }
+
+    @Override
+    public CuratorFrameworkState getState()
+    {
+        return facade.getState();
+    }
+
+    @Override
+    public boolean isStarted()
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public CreateBuilder create()
+    {
+        return facade.create();
+    }
+
+    @Override
+    public DeleteBuilder delete()
+    {
+        return facade.delete();
+    }
+
+    @Override
+    public ExistsBuilder checkExists()
+    {
+        return facade.checkExists();
+    }
+
+    @Override
+    public GetDataBuilder getData()
+    {
+        return facade.getData();
+    }
+
+    @Override
+    public SetDataBuilder setData()
+    {
+        return facade.setData();
+    }
+
+    @Override
+    public GetChildrenBuilder getChildren()
+    {
+        return facade.getChildren();
+    }
+
+    @Override
+    public GetACLBuilder getACL()
+    {
+        return facade.getACL();
+    }
+
+    @Override
+    public SetACLBuilder setACL()
+    {
+        return facade.setACL();
+    }
+
+    @Override
+    public ReconfigBuilder reconfig()
+    {
+        return facade.reconfig();
+    }
+
+    @Override
+    public GetConfigBuilder getConfig()
+    {
+        return facade.getConfig();
+    }
+
+    @Override
+    public CuratorTransaction inTransaction()
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public CuratorMultiTransaction transaction()
+    {
+        return facade.transaction();
+    }
+
+    @Override
+    public TransactionOp transactionOp()
+    {
+        return facade.transactionOp();
+    }
+
+    @Override
+    public void sync(String path, Object backgroundContextObject)
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public void createContainers(String path) throws Exception
+    {
+        facade.createContainers(path);
+    }
+
+    @Override
+    public SyncBuilder sync()
+    {
+        return facade.sync();
+    }
+
+    @Override
+    public WatchesBuilder watches()
+    {
+        return new WatchesBuilderImpl(client);
+    }
+
+    @Override
+    public Listenable<ConnectionStateListener> getConnectionStateListenable()
+    {
+        return facade.getConnectionStateListenable();
+    }
+
+    @Override
+    public Listenable<CuratorListener> getCuratorListenable()
+    {
+        return facade.getCuratorListenable();
+    }
+
+    @Override
+    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
+    {
+        return facade.getUnhandledErrorListenable();
+    }
+
+    @Override
+    public CuratorFramework nonNamespaceView()
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public CuratorFrameworkV2 usingNamespace(String newNamespace)
+    {
+        return CuratorFrameworkV2.wrap(facade.usingNamespace(newNamespace));
+    }
+
+    @Override
+    public String getNamespace()
+    {
+        return facade.getNamespace();
+    }
+
+    @Override
+    public CuratorZookeeperClient getZookeeperClient()
+    {
+        return facade.getZookeeperClient();
+    }
+
+    @Override
+    public EnsurePath newNamespaceAwareEnsurePath(String path)
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public void clearWatcherReferences(Watcher watcher)
+    {
+        throw new UnsupportedOperationException("deprecated");
+    }
+
+    @Override
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+    {
+        return facade.blockUntilConnected(maxWaitTime, units);
+    }
+
+    @Override
+    public void blockUntilConnected() throws InterruptedException
+    {
+        facade.blockUntilConnected();
+    }
+
+    @Override
+    public WatcherRemoveCuratorFrameworkV2 newWatcherRemoveCuratorFramework()
+    {
+        return new WatcherRemoveCuratorFrameworkV2Impl(client, facade.newWatcherRemoveCuratorFramework());
+    }
+
+    @Override
+    public ConnectionStateErrorPolicy getConnectionStateErrorPolicy()
+    {
+        return facade.getConnectionStateErrorPolicy();
+    }
+
+    @Override
+    public QuorumVerifier getCurrentConfig()
+    {
+        return facade.getCurrentConfig();
+    }
+
+    @Override
+    public SchemaSet getSchemaSet()
+    {
+        return facade.getSchemaSet();
+    }
+
+    @Override
+    public boolean isZk34CompatibilityMode()
+    {
+        return facade.isZk34CompatibilityMode();
+    }
+
+    @Override
+    public CompletableFuture<Void> runSafe(Runnable runnable)
+    {
+        return facade.runSafe(runnable);
+    }
+}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
similarity index 50%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-v2/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
index 42279d0..a840ffe 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
@@ -16,16 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
 
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.WatchesBuilder;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.WatcherType;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder
 {
+    public WatchesBuilderImpl(CuratorFrameworkImpl client)
+    {
+        super(client);
+    }
+
+    public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding)
+    {
+        super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding);
+    }
+
     @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
+    public AddWatchBuilder add()
     {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        return new AddWatchBuilderImpl(getClient());
     }
-}
+}
\ No newline at end of file
diff --git a/curator-v2/src/main/java/org/apache/curator/v2/AsyncCuratorFrameworkV2.java b/curator-v2/src/main/java/org/apache/curator/v2/AsyncCuratorFrameworkV2.java
new file mode 100644
index 0000000..745322c
--- /dev/null
+++ b/curator-v2/src/main/java/org/apache/curator/v2/AsyncCuratorFrameworkV2.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.v2;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.WatchMode;
+import org.apache.curator.x.async.api.AsyncCuratorFrameworkDslV2;
+import org.apache.curator.x.async.details.AsyncCuratorFrameworkV2Impl;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.function.UnaryOperator;
+
+public interface AsyncCuratorFrameworkV2 extends AsyncCuratorFramework, AsyncCuratorFrameworkDslV2
+{
+    /**
+     * Wrap a CuratorFramework instance to gain access to newer ZooKeeper features
+     *
+     * @param client client instance
+     * @return wrapped client
+     */
+    static AsyncCuratorFrameworkV2 wrap(CuratorFramework client)
+    {
+        return new AsyncCuratorFrameworkV2Impl(AsyncCuratorFramework.wrap(client));
+    }
+
+    /**
+     * Wrap a AsyncCuratorFramework instance to gain access to newer ZooKeeper features
+     *
+     * @param client client instance
+     * @return wrapped client
+     */
+    static AsyncCuratorFrameworkV2 wrap(AsyncCuratorFramework client)
+    {
+        return new AsyncCuratorFrameworkV2Impl(client);
+    }
+
+    @Override
+    AsyncCuratorFrameworkDslV2 with(WatchMode mode);
+
+    @Override
+    AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener);
+
+    @Override
+    AsyncCuratorFrameworkDslV2 with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter);
+
+    @Override
+    AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter);
+
+    @Override
+    AsyncCuratorFrameworkDslV2 with(WatchMode mode, UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter);
+}
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/v2/CuratorFrameworkV2.java
similarity index 51%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-v2/src/main/java/org/apache/curator/v2/CuratorFrameworkV2.java
index 42279d0..a83bb46 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-v2/src/main/java/org/apache/curator/v2/CuratorFrameworkV2.java
@@ -16,16 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.v2;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.WatchesBuilder;
+import org.apache.curator.framework.imps.CuratorFrameworkV2Impl;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface CuratorFrameworkV2 extends CuratorFramework
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
+    /**
+     * Wrap a CuratorFramework instance to gain access to newer ZooKeeper features
+     *
+     * @param client client instance
+     * @return wrapped client
+     */
+    static CuratorFrameworkV2 wrap(CuratorFramework client)
     {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        return new CuratorFrameworkV2Impl(client);
     }
+
+    /**
+     * Start a watches builder
+     *
+     * @return builder
+     */
+    WatchesBuilder watches();
+
+    @Override
+    CuratorFrameworkV2 usingNamespace(String newNamespace);
+
+    @Override
+    WatcherRemoveCuratorFrameworkV2 newWatcherRemoveCuratorFramework();
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/v2/WatcherRemoveCuratorFrameworkV2.java
similarity index 79%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-v2/src/main/java/org/apache/curator/v2/WatcherRemoveCuratorFrameworkV2.java
index 5b4b53f..ba0b61c 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-v2/src/main/java/org/apache/curator/v2/WatcherRemoveCuratorFrameworkV2.java
@@ -16,12 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
+package org.apache.curator.v2;
 
-public class Compatibility
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+
+public interface WatcherRemoveCuratorFrameworkV2 extends WatcherRemoveCuratorFramework, CuratorFrameworkV2
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDslV2.java
similarity index 77%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDslV2.java
index 5b4b53f..ae851a8 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDslV2.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
+package org.apache.curator.x.async.api;
 
-public class Compatibility
+public interface AsyncCuratorFrameworkDslV2 extends AsyncCuratorFrameworkDsl
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
+    /**
+     * Start an add watch builder
+     *
+     * @return builder object
+     */
+    AsyncWatchBuilder addWatch();
 }
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
similarity index 60%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
index 42279d0..73172c3 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
@@ -16,16 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+package org.apache.curator.x.async.api;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.AddWatchMode;
+
+public interface AsyncWatchBuilder extends AddWatchable<AsyncPathable<AsyncStage<Void>>>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode
+     * @return this
+     */
+    AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkV2Impl.java b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkV2Impl.java
new file mode 100644
index 0000000..210a79f
--- /dev/null
+++ b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkV2Impl.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.v2.AsyncCuratorFrameworkV2;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.WatchMode;
+import org.apache.curator.x.async.api.*;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+
+public class AsyncCuratorFrameworkV2Impl implements AsyncCuratorFrameworkV2
+{
+    private final AsyncCuratorFrameworkImpl client;
+
+    public AsyncCuratorFrameworkV2Impl(AsyncCuratorFramework client)
+    {
+        this(reveal(client));
+    }
+
+    private AsyncCuratorFrameworkV2Impl(AsyncCuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    private static AsyncCuratorFrameworkImpl reveal(Object client)
+    {
+        try
+        {
+            return (AsyncCuratorFrameworkImpl)Objects.requireNonNull(client, "client cannot be null");
+        }
+        catch ( Exception e )
+        {
+            throw new IllegalArgumentException("Only AsyncCuratorFramework clients wrapped via AsyncCuratorFramework.wrap(): " + client.getClass().getName());
+        }
+    }
+
+    @Override
+    public CuratorFramework unwrap()
+    {
+        return client.unwrap();
+    }
+
+    @Override
+    public AsyncCuratorFrameworkDslV2 with(WatchMode mode)
+    {
+        return new AsyncCuratorFrameworkV2Impl(reveal(client.with(mode)));
+    }
+
+    @Override
+    public AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener)
+    {
+        return new AsyncCuratorFrameworkV2Impl(reveal(client.with(listener)));
+    }
+
+    @Override
+    public AsyncCuratorFrameworkDslV2 with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter)
+    {
+        return new AsyncCuratorFrameworkV2Impl(reveal(client.with(resultFilter, watcherFilter)));
+    }
+
+    @Override
+    public AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter)
+    {
+        return new AsyncCuratorFrameworkV2Impl(reveal(client.with(listener, resultFilter, watcherFilter)));
+    }
+
+    @Override
+    public AsyncCuratorFrameworkDslV2 with(WatchMode mode, UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter)
+    {
+        return new AsyncCuratorFrameworkV2Impl(reveal(client.with(mode, listener, resultFilter, watcherFilter)));
+    }
+
+    @Override
+    public AsyncWatchBuilder addWatch()
+    {
+        return new AsyncWatchBuilderImpl(client.getClient(), client.getFilters());
+    }
+
+    @Override
+    public WatchableAsyncCuratorFramework watched()
+    {
+        return client.watched();
+    }
+
+    @Override
+    public AsyncCreateBuilder create()
+    {
+        return client.create();
+    }
+
+    @Override
+    public AsyncDeleteBuilder delete()
+    {
+        return client.delete();
+    }
+
+    @Override
+    public AsyncSetDataBuilder setData()
+    {
+        return client.setData();
+    }
+
+    @Override
+    public AsyncGetACLBuilder getACL()
+    {
+        return client.getACL();
+    }
+
+    @Override
+    public AsyncSetACLBuilder setACL()
+    {
+        return client.setACL();
+    }
+
+    @Override
+    public AsyncReconfigBuilder reconfig()
+    {
+        return client.reconfig();
+    }
+
+    @Override
+    public AsyncMultiTransaction transaction()
+    {
+        return client.transaction();
+    }
+
+    @Override
+    public AsyncTransactionOp transactionOp()
+    {
+        return client.transactionOp();
+    }
+
+    @Override
+    public AsyncSyncBuilder sync()
+    {
+        return client.sync();
+    }
+
+    @Override
+    public AsyncRemoveWatchesBuilder removeWatches()
+    {
+        return client.removeWatches();
+    }
+
+    @Override
+    public AsyncExistsBuilder checkExists()
+    {
+        return client.checkExists();
+    }
+
+    @Override
+    public AsyncGetDataBuilder getData()
+    {
+        return client.getData();
+    }
+
+    @Override
+    public AsyncGetChildrenBuilder getChildren()
+    {
+        return client.getChildren();
+    }
+
+    @Override
+    public AsyncGetConfigBuilder getConfig()
+    {
+        return client.getConfig();
+    }
+}
diff --git a/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
new file mode 100644
index 0000000..f3c17ad
--- /dev/null
+++ b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.AddWatchBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.Watching;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncWatchBuilder;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+{
+    private final CuratorFrameworkImpl client;
+    private final Filters filters;
+    private Watching watching = null;
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+    {
+        this.client = client;
+        this.filters = filters;
+    }
+
+    @Override
+    public AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncStage<Void> forPath(String path)
+    {
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+        AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode);
+        return safeCall(common.internalCallback, () -> builder.forPath(path));
+    }
+}
\ No newline at end of file
diff --git a/curator-v2/src/test/java/org/apache/curator/framework/v2/TestAsyncCuratorFrameworkV2.java b/curator-v2/src/test/java/org/apache/curator/framework/v2/TestAsyncCuratorFrameworkV2.java
new file mode 100644
index 0000000..4785787
--- /dev/null
+++ b/curator-v2/src/test/java/org/apache/curator/framework/v2/TestAsyncCuratorFrameworkV2.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.v2;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.v2.AsyncCuratorFrameworkV2;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+public class TestAsyncCuratorFrameworkV2 extends CuratorTestBase
+{
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            AsyncCuratorFrameworkV2 async = AsyncCuratorFrameworkV2.wrap(client);
+
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test").toCompletableFuture().get();
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+}
diff --git a/curator-v2/src/test/java/org/apache/curator/framework/v2/TestFrameworkV2.java b/curator-v2/src/test/java/org/apache/curator/framework/v2/TestFrameworkV2.java
new file mode 100644
index 0000000..f00dab3
--- /dev/null
+++ b/curator-v2/src/test/java/org/apache/curator/framework/v2/TestFrameworkV2.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.v2;
+
+import org.apache.curator.v2.CuratorFrameworkV2;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.curator.v2.CuratorFrameworkV2.wrap;
+
+public class TestFrameworkV2 extends CuratorTestBase
+{
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFrameworkV2 client = wrap(CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            client.watches().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+}
diff --git a/curator-v2/src/test/resources/log4j.properties b/curator-v2/src/test/resources/log4j.properties
new file mode 100644
index 0000000..2a85e0d
--- /dev/null
+++ b/curator-v2/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=ERROR, console
+
+log4j.logger.org.apache.curator=DEBUG, console
+log4j.additivity.org.apache.curator=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..f13d311 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -221,6 +221,16 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
         return new AsyncGetConfigBuilderImpl(client, filters, getBuilderWatchMode());
     }
 
+    Filters getFilters()
+    {
+        return filters;
+    }
+
+    CuratorFrameworkImpl getClient()
+    {
+        return client;
+    }
+
     private WatchMode getBuilderWatchMode()
     {
         return watched ? watchMode : null;
diff --git a/pom.xml b/pom.xml
index 3a5e152..731cc84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
         <jdk-version>1.${short-jdk-version}</jdk-version>
 
         <!-- versions -->
-        <zookeeper-version>3.5.5</zookeeper-version>
+        <zookeeper-version>3.5.6</zookeeper-version>
         <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
         <doxia-module-confluence-version>1.8</doxia-module-confluence-version>
@@ -85,25 +85,27 @@
         <guava-failureaccess-version>1.0.1</guava-failureaccess-version>
         <testng-version>6.14.3</testng-version>
         <swift-version>0.23.1</swift-version>
-        <dropwizard-version>1.3.7</dropwizard-version>
         <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version>
         <slf4j-version>1.7.25</slf4j-version>
         <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version>
+        <dropwizard-version>3.2.5</dropwizard-version>
+        <snappy-version>1.1.7</snappy-version>
 
         <!-- OSGi Properties -->
-        <osgi.export.package />
-        <osgi.import.package />
-        <osgi.private.package />
-        <osgi.dynamic.import />
-        <osgi.require.bundle />
-        <osgi.export.service />
-        <osgi.activator />
+        <osgi.export.package/>
+        <osgi.import.package/>
+        <osgi.private.package/>
+        <osgi.dynamic.import/>
+        <osgi.require.bundle/>
+        <osgi.export.service/>
+        <osgi.activator/>
     </properties>
 
     <scm>
         <url>https://github.com/apache/curator.git</url>
         <connection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</connection>
-        <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</developerConnection>
+        <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git
+        </developerConnection>
         <tag>apache-curator-3.2.0</tag>
     </scm>
 
@@ -318,6 +320,7 @@
         <module>curator-x-discovery-server</module>
         <module>curator-x-async</module>
         <module>curator-test-zk34</module>
+        <module>curator-v2</module>
     </modules>
 
     <dependencyManagement>
@@ -567,6 +570,24 @@
                 <artifactId>dropwizard-logging</artifactId>
                 <version>${dropwizard-version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard-version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>${snappy-version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -582,7 +603,6 @@
                 <artifactId>maven-javadoc-plugin</artifactId>
                 <version>${maven-javadoc-plugin-version}</version>
                 <configuration>
-                    <aggregate>true</aggregate>
                     <additionalJOptions>
                         <additionalJOption>-J-Xmx1g</additionalJOption>
                     </additionalJOptions>
@@ -872,7 +892,8 @@
                             <relocations>
                                 <relocation>
                                     <pattern>com.google</pattern>
-                                    <shadedPattern>org.apache.curator.shaded.com.google</shadedPattern>
+                                    <shadedPattern>org.apache.curator.shaded.com.google
+                                    </shadedPattern>
                                     <excludes>
                                         <exclude>com.google.common.base.Function</exclude>
                                         <exclude>com.google.common.base.Predicate</exclude>
diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility.confluence
index ed6e32e..7ede4a4 100644
--- a/src/site/confluence/zk-compatibility.confluence
+++ b/src/site/confluence/zk-compatibility.confluence
@@ -1,15 +1,45 @@
 h1. ZooKeeper Version Compatibility
 
-While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is
-used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator
-4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0
-both versions of ZooKeeper are supported via the same Curator libraries.
+There are multiple active version lines of ZooKeeper used in production by users. Curator can act
+as a client for any of these versions.
 
 h2. ZooKeeper 3.5.x
 
-* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x
+* Curator 4.x has a hard dependency on ZooKeeper 3.5.x
 * If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0
 
+h2. ZooKeeper 3.6.x
+
+ZooKeeper 3.6.x is the newest version of ZooKeeper which adds new features such as
+Persistent/Recursive watchers. To use Curator X.X with ZooKeeper 3.6.x use the dependency
+{{curator-v2}} instead of {{curator-recipes}}. Create a {{CuratorFramework}} instance in the
+normal manner. To get access to new features, wrap the CuratorFramework in a CuratorFrameworkV2
+instances. E.g.
+
+{code}
+CuratorFramework client = CuratorFrameworkFactory...
+CuratorFrameworkV2 clientV2 = CuratorFrameworkV2.wrap(client);
+
+client.watches().add()...
+{code}
+
+_Maven_
+
+{code}
+<dependency>
+    <groupId>org.apache.curator</groupId>
+    <artifactId>curator-v2</artifactId>
+    <version>${curator-version}</version>
+</dependency>
+{code}
+
+_Gradle_
+
+{code}
+compile 'org.apache.curator:curator-v2:$curatorVersion'
+{code}
+
+
 h2. ZooKeeper 3.4.x
 
 Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode