You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/14 13:21:27 UTC

[curator] branch persistent-watcher updated (f3d2258 -> bdddaf9)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch persistent-watcher
in repository https://gitbox.apache.org/repos/asf/curator.git.


 discard f3d2258  Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
     add 551e866  CURATOR-546
     new bdddaf9  Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f3d2258)
            \
             N -- N -- N   refs/heads/persistent-watcher (bdddaf9)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/curator/utils/Compatibility.java    | 25 +++++++++
 .../apache/curator/framework/CuratorFramework.java |  8 +--
 .../api/{Statable.java => WatchesBuilder.java}     | 64 +++++++++++-----------
 .../framework/imps/CuratorFrameworkImpl.java       | 11 +---
 .../framework/imps/RemoveWatchesBuilderImpl.java   |  9 ++-
 .../curator/framework/imps/WatchesBuilderImpl.java | 47 ++++++++++++++++
 .../curator/framework/imps/TestFramework.java      |  4 +-
 .../x/async/modeled/details/ModeledCacheImpl.java  |  2 +-
 .../async/modeled/TestCachedModeledFramework.java  | 32 ++++++++---
 9 files changed, 141 insertions(+), 61 deletions(-)
 copy curator-framework/src/main/java/org/apache/curator/framework/api/{Statable.java => WatchesBuilder.java} (79%)
 create mode 100644 curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java


[curator] 01/01: Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher
in repository https://gitbox.apache.org/repos/asf/curator.git

commit bdddaf9ea96e5cd1732ea00f51ff092b1dcc2d60
Author: randgalt <ra...@apache.org>
AuthorDate: Wed Oct 2 20:09:32 2019 -0500

    Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
---
 .../org/apache/curator/utils/Compatibility.java    |  25 +++
 .../curator/utils/DefaultZookeeperFactory.java     |   3 +-
 .../apache/curator/framework/CuratorFramework.java |   2 +-
 .../curator/framework/api/AddWatchBuilder.java     |  21 +--
 .../curator/framework/api/AddWatchBuilder2.java    |  15 +-
 .../apache/curator/framework/api/AddWatchable.java |  27 ++--
 .../curator/framework/api/CuratorEventType.java    |   7 +-
 .../curator/framework/api/WatchesBuilder.java      |  63 ++++----
 .../framework/imps/AddWatchBuilderImpl.java        | 171 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |   4 +-
 .../imps/CuratorMultiTransactionRecord.java        |  34 ++--
 .../framework/imps/ReconfigBuilderImpl.java        |  18 ++-
 .../framework/imps/RemoveWatchesBuilderImpl.java   |   9 +-
 .../curator/framework/imps/WatchesBuilderImpl.java |  47 ++++++
 .../apache/curator/framework/imps/Watching.java    |  10 +-
 .../curator/framework/imps/TestFramework.java      |  51 ++++++
 .../recipes/leader/ChaosMonkeyCnxnFactory.java     |   7 +-
 curator-test/pom.xml                               |  10 ++
 .../apache/curator/test/TestingQuorumPeerMain.java |   3 +-
 .../apache/curator/test/TestingZooKeeperMain.java  |   5 +-
 .../org/apache/curator/test/WatchersDebug.java     |   9 ++
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |   7 +
 .../curator/x/async/api/AsyncWatchBuilder.java     |  52 +++++++
 .../x/async/details/AsyncCuratorFrameworkImpl.java |   6 +
 .../x/async/details/AsyncWatchBuilderImpl.java     |  94 +++++++++++
 .../curator/framework/imps/TestFramework.java      |  59 +++++++
 pom.xml                                            |  23 ++-
 27 files changed, 686 insertions(+), 96 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
index 1ee2301..618b302 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
@@ -31,6 +31,7 @@ import java.lang.reflect.Method;
 public class Compatibility
 {
     private static final boolean hasZooKeeperAdmin;
+    private static final boolean hasPersistentWatches;
     private static final Method queueEventMethod;
     private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
 
@@ -61,6 +62,19 @@ public class Compatibility
             LoggerFactory.getLogger(Compatibility.class).info("Using emulated InjectSessionExpiration");
         }
         queueEventMethod = localQueueEventMethod;
+
+        boolean localHasPersistentWatches;
+        try
+        {
+            Class.forName("org.apache.zookeeper.AddWatchMode");
+            localHasPersistentWatches = true;
+        }
+        catch ( ClassNotFoundException e )
+        {
+            localHasPersistentWatches = false;
+            logger.info("Running without persistent watches");
+        }
+        hasPersistentWatches = localHasPersistentWatches;
     }
 
     /**
@@ -74,6 +88,17 @@ public class Compatibility
     }
 
     /**
+     * Return true of persistent watches are available in the
+     * ZooKeeper library being used.
+     *
+     * @return true/false
+     */
+    public static boolean hasPersistentWatches()
+    {
+        return hasPersistentWatches;
+    }
+
+    /**
      * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>.
      * For ZooKeeper 3.4.x do the equivalent via reflection
      *
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
index 42279d0..acd32e7 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
@@ -20,12 +20,13 @@ package org.apache.curator.utils;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 
 public class DefaultZookeeperFactory implements ZookeeperFactory
 {
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
     {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
     }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..a803e63 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -196,7 +196,7 @@ public interface CuratorFramework extends Closeable
      * Start a remove watches builder.
      * @return builder object
      */
-    public RemoveWatchesBuilder watches();
+    public WatchesBuilder watches();
 
     /**
      * Returns the listenable interface for the Connect State
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
similarity index 65%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
index 42279d0..ad6d434 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AddWatchMode;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchBuilder extends AddWatchBuilder2
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode to use
+     * @return this
+     */
+    AddWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
similarity index 65%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
index 42279d0..b784fc5 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
@@ -16,16 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchBuilder2 extends
+    Backgroundable<AddWatchable<Pathable<Void>>>, AddWatchable<Pathable<Void>>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
similarity index 68%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
index 42279d0..dbd2666 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
@@ -16,16 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchable<T>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..22cc181 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
     /**
      * Event sent when client is being closed
      */
-    CLOSING
+    CLOSING,
+
+    /**
+     * Corresponds to {@link CuratorFramework#addWatch()}
+     */
+    ADD_WATCH
 }
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
similarity index 65%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
index 42279d0..821f1a3 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
@@ -1,31 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.utils;
-
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-public class DefaultZookeeperFactory implements ZookeeperFactory
-{
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow watches to be removed 
+ */
+public interface WatchesBuilder extends RemoveWatchesBuilder
+{
+    /**
+     * Start an add watch operation
+     *
+     * @return builder
+     */
+    AddWatchBuilder add();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
new file mode 100644
index 0000000..64e1de5
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.AddWatchBuilder2;
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching = null;
+    private Backgrounding backgrounding = new Backgrounding();
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AddWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.mode = mode;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background");
+            client.getZooKeeper().addWatch
+                (
+                    fixedPath,
+                    watching.getWatcher(path),
+                    mode,
+                    (rc, path1, ctx) -> {
+                        trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                        CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                        client.processBackgroundOperation(data, event);
+                    },
+                    backgrounding.getContext()
+                );
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..c8ebbb6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -565,10 +565,10 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     @Override
-    public RemoveWatchesBuilder watches()
+    public WatchesBuilder watches()
     {
         Preconditions.checkState(!isZk34CompatibilityMode(), "Remove watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
-        return new RemoveWatchesBuilderImpl(this);
+        return new WatchesBuilderImpl(this);
     }
 
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 3e72609..fbac6e6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -16,49 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.transaction.OperationType;
 import org.apache.curator.framework.api.transaction.TypeAndPath;
-import org.apache.zookeeper.MultiTransactionRecord;
 import org.apache.zookeeper.Op;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
-class CuratorMultiTransactionRecord extends MultiTransactionRecord
+class CuratorMultiTransactionRecord implements Iterable<Op>
 {
-    private final List<TypeAndPath>     metadata = Lists.newArrayList();
-
-    @Override
-    public final void add(Op op)
-    {
-        throw new UnsupportedOperationException();
-    }
+    private final List<TypeAndPath> metadata = Lists.newArrayList();
+    private List<Op> ops = new ArrayList<>();
 
     void add(Op op, OperationType type, String forPath)
     {
-        super.add(op);
+        ops.add(op);
         metadata.add(new TypeAndPath(type, forPath));
     }
 
-    TypeAndPath     getMetadata(int index)
+    TypeAndPath getMetadata(int index)
     {
         return metadata.get(index);
     }
 
-    int             metadataSize()
+    int metadataSize()
     {
         return metadata.size();
     }
 
     void addToDigest(MessageDigest digest)
     {
-        for ( Op op : this )
+        for ( Op op : ops )
         {
             digest.update(op.getPath().getBytes());
             digest.update(Integer.toString(op.getType()).getBytes());
             digest.update(op.toRequestRecord().toString().getBytes());
         }
     }
+
+    @Override
+    public Iterator<Op> iterator()
+    {
+        return ops.iterator();
+    }
+
+    int size()
+    {
+        return ops.size();
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 97be59a..f8b78da 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -24,6 +24,8 @@ import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import java.util.Arrays;
@@ -268,7 +270,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     client.processBackgroundOperation(data, event);
                 }
             };
-            client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
+            admin().reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
         }
         catch ( Throwable e )
         {
@@ -276,6 +278,18 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
         }
     }
 
+    private ZooKeeperAdmin admin() throws Exception
+    {
+        try
+        {
+            return (ZooKeeperAdmin)client.getZooKeeper();
+        }
+        catch ( ClassCastException e )
+        {
+            throw new Exception("ZooKeeper instance is not an instance of ZooKeeperAdmin");
+        }
+    }
+
     private byte[] ensembleInForeground() throws Exception
     {
         TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground");
@@ -287,7 +301,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     @Override
                     public byte[] call() throws Exception
                     {
-                        return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat);
+                        return admin().reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
                     }
                 }
             );
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index e14deff..b268e71 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -201,8 +201,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
         }        
         
         return null;
-    }    
-    
+    }
+
+    CuratorFrameworkImpl getClient()
+    {
+        return client;
+    }
+
     private void pathInBackground(final String path)
     {
         OperationAndData.ErrorCallback<String>  errorCallback = null;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
new file mode 100644
index 0000000..f3c3857
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.WatchesBuilder;
+import org.apache.curator.utils.Compatibility;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+
+public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder
+{
+    public WatchesBuilderImpl(CuratorFrameworkImpl client)
+    {
+        super(client);
+    }
+
+    public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding)
+    {
+        super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding);
+    }
+
+    @Override
+    public AddWatchBuilder add()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatches(), "Persistent watches APIs are not support when running the ZooKeeper library being used");
+        return new AddWatchBuilderImpl(getClient());
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..c448043 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -37,6 +37,7 @@ import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -1265,4 +1266,54 @@ public class TestFramework extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            client.watches().add().usingWatcher(watcher).forPath("/top/main");
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            client.watches().add().withMode(AddWatchMode.PERSISTENT).usingWatcher(watcher).forPath("/top/main");
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 4cb342c..f3dda40 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +93,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                 log.debug("Rejected : " + si.toString());
                 // Still reject request
                 log.debug("Still not ready for " + remaining + "ms");
-                ((NIOServerCnxn)si.cnxn).close();
+                ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                 return;
             }
             // Submit the request to the legacy Zookeeper server
@@ -113,13 +114,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                         firstError = System.currentTimeMillis();
                         // The znode has been created, close the connection and don't tell it to client
                         log.warn("Closing connection right after " + createRequest.getPath() + " creation");
-                        ((NIOServerCnxn)si.cnxn).close();
+                        ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                     }
                 }
                 catch ( Exception e )
                 {
                     // Should not happen
-                    ((NIOServerCnxn)si.cnxn).close();
+                    ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                 }
             }
         }
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 3683b7d..4f0c5a2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -41,6 +41,16 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
index 3b3ab26..4baaff9 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.test;
 
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
@@ -39,7 +40,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
                 Field               cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory");
                 cnxnFactoryField.setAccessible(true);
                 ServerCnxnFactory   cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer);
-                cnxnFactory.closeAll();
+                cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN);
 
                 Field               ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index 841df77..8e5ffee 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@ -23,6 +23,7 @@ import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
 import org.apache.zookeeper.server.ContainerManager;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZKDatabase;
@@ -81,7 +82,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
         {
             if ( cnxnFactory != null )
             {
-                cnxnFactory.closeAll();
+                cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN);
 
                 Field ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
@@ -262,7 +263,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
     {
         public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
         {
-            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null);
+            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), config.getClientPortListenBacklog(), new ZKDatabase(txnLog), "");
         }
 
         private final AtomicBoolean isRunning = new AtomicBoolean(false);
diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
index e4c3b7e..e884b8c 100644
--- a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
+++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
@@ -27,16 +27,19 @@ public class WatchersDebug
     private static final Method getDataWatches;
     private static final Method getExistWatches;
     private static final Method getChildWatches;
+    private static final Method getPersistentWatches;
     static
     {
         Method localGetDataWatches = null;
         Method localGetExistWatches = null;
         Method localGetChildWatches = null;
+        Method localGetPersistentWatches = null;
         try
         {
             localGetDataWatches = getMethod("getDataWatches");
             localGetExistWatches = getMethod("getExistWatches");
             localGetChildWatches = getMethod("getChildWatches");
+            localGetPersistentWatches = getMethod("getPersistentWatches");
         }
         catch ( NoSuchMethodException e )
         {
@@ -45,6 +48,7 @@ public class WatchersDebug
         getDataWatches = localGetDataWatches;
         getExistWatches = localGetExistWatches;
         getChildWatches = localGetChildWatches;
+        getPersistentWatches = localGetPersistentWatches;
     }
 
     public static List<String> getDataWatches(ZooKeeper zooKeeper)
@@ -62,6 +66,11 @@ public class WatchersDebug
         return callMethod(zooKeeper, getChildWatches);
     }
 
+    public static List<String> getPersistentWatches(ZooKeeper zooKeeper)
+    {
+        return callMethod(zooKeeper, getPersistentWatches);
+    }
+
     private WatchersDebug()
     {
     }
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..d70c02c 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
     AsyncReconfigBuilder reconfig();
 
     /**
+     * Start an add watch builder
+     *
+     * @return builder object
+     */
+    AsyncWatchBuilder addWatch();
+
+    /**
      * Start a transaction builder
      *
      * @return builder object
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
new file mode 100644
index 0000000..f782a4c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.api;
+
+ import org.apache.curator.framework.api.AddWatchable;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.zookeeper.AddWatchMode;
+
+public interface AsyncWatchBuilder extends AddWatchable<AsyncPathable<AsyncStage<Void>>>
+ {
+     /**
+      * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+      *
+      * @param mode mode
+      * @return this
+      */
+     AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode);
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..41ddee2 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncWatchBuilder addWatch()
+    {
+        return new AsyncWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public AsyncMultiTransaction transaction()
     {
         return operations -> {
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
new file mode 100644
index 0000000..3092f51
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.details;
+
+ import org.apache.curator.framework.api.AddWatchable;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.imps.AddWatchBuilderImpl;
+ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+ import org.apache.curator.framework.imps.Watching;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.curator.x.async.api.AsyncPathable;
+ import org.apache.curator.x.async.api.AsyncWatchBuilder;
+ import org.apache.zookeeper.AddWatchMode;
+ import org.apache.zookeeper.Watcher;
+
+ import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+ class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+ {
+     private final CuratorFrameworkImpl client;
+     private final Filters filters;
+     private Watching watching = null;
+     private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+     AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+     {
+         this.client = client;
+         this.filters = filters;
+     }
+
+     @Override
+     public AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode)
+     {
+         this.mode = mode;
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncStage<Void> forPath(String path)
+     {
+         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+         AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode);
+         return safeCall(common.internalCallback, () -> builder.forPath(path));
+     }
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index 27a84d0..f2b51fe 100644
--- a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -30,6 +30,8 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -37,8 +39,11 @@ import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.api.DeleteOption;
 import org.apache.curator.x.async.api.ExistsOption;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.testng.Assert;
@@ -657,4 +662,58 @@ public class TestFramework extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            async.addWatch().usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get();
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            async.addWatch().withMode(AddWatchMode.PERSISTENT).usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get();
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git a/pom.xml b/pom.xml
index 3a5e152..27b34cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
         <jdk-version>1.${short-jdk-version}</jdk-version>
 
         <!-- versions -->
-        <zookeeper-version>3.5.5</zookeeper-version>
+        <zookeeper-version>3.6.0-SNAPSHOT</zookeeper-version>
         <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
         <doxia-module-confluence-version>1.8</doxia-module-confluence-version>
@@ -85,10 +85,11 @@
         <guava-failureaccess-version>1.0.1</guava-failureaccess-version>
         <testng-version>6.14.3</testng-version>
         <swift-version>0.23.1</swift-version>
-        <dropwizard-version>1.3.7</dropwizard-version>
         <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version>
         <slf4j-version>1.7.25</slf4j-version>
         <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version>
+        <dropwizard-version>3.2.5</dropwizard-version>
+        <snappy-version>1.1.7</snappy-version>
 
         <!-- OSGi Properties -->
         <osgi.export.package />
@@ -567,6 +568,24 @@
                 <artifactId>dropwizard-logging</artifactId>
                 <version>${dropwizard-version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard-version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>${snappy-version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>