You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/03/16 00:32:21 UTC

[curator] 01/03: CURATOR-549

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 6fde5dff9f144165318e44591afcb4694fbe2048
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500

    CURATOR-549
    
    Support persistent watchers in ZK 3.6+ while maintaining background compatibility with previous versions of ZK. Adds Curator Framework DSL calls to ZooKeeper's addWatch() method. Subsequent PRs will add recipes. Both the older Framework and the async Framework now have methods to add persistent watchers.
---
 .../org/apache/curator/utils/Compatibility.java    |  23 ++-
 .../src/test/java/org/apache/curator/TestIs36.java |   1 +
 .../apache/curator/framework/CuratorFramework.java |  10 ++
 .../curator/framework/api/AddWatchBuilder.java     |  24 ++-
 .../curator/framework/api/AddWatchBuilder2.java    |  19 +-
 .../apache/curator/framework/api/AddWatchable.java |  31 ++--
 .../curator/framework/api/WatchesBuilder.java      |  25 ++-
 .../framework/imps/AddWatchBuilderImpl.java        | 197 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |   8 +
 .../curator/framework/imps/WatchesBuilderImpl.java |  45 +++++
 .../framework/imps/TestReconfiguration.java        |   1 -
 .../curator/framework/imps/TestWatchesBuilder.java |  53 ++++++
 .../curator/framework/imps/TestWithCluster.java    |   1 -
 .../recipes/leader/TestLeaderSelectorEdges.java    |   1 +
 .../curator/framework/TestCompatibility.java       |  49 +++++
 .../java/org/apache/curator/zk35/TestIs35.java     |   1 +
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |   9 +
 .../curator/x/async/api/AsyncWatchBuilder.java     |  29 +--
 .../curator/x/async/api/AsyncWatchBuilder2.java    |  21 +--
 .../x/async/details/AsyncCuratorFrameworkImpl.java |   9 +
 .../x/async/details/AsyncWatchBuilderImpl.java     |  79 +++++++++
 .../curator/framework/imps/TestAddWatch.java       |  87 +++++++++
 22 files changed, 642 insertions(+), 81 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
index fa43d35..6a28ebf 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
@@ -31,10 +31,11 @@ import java.net.InetSocketAddress;
  */
 public class Compatibility
 {
+    private static final Logger log = LoggerFactory.getLogger(Compatibility.class);
+
     private static final Method getReachableOrOneMethod;
     private static final Field addrField;
-
-    private static final Logger log = LoggerFactory.getLogger(Compatibility.class);
+    private static final boolean hasPersistentWatchers;
 
     static
     {
@@ -62,6 +63,19 @@ public class Compatibility
             LoggerFactory.getLogger(Compatibility.class).error("Could not get addr field! Reconfiguration fail!");
         }
         addrField = localAddrField;
+
+        boolean localHasPersistentWatchers;
+        try
+        {
+            Class.forName("org.apache.zookeeper.AddWatchMode");
+            localHasPersistentWatchers = true;
+        }
+        catch ( ClassNotFoundException e )
+        {
+            localHasPersistentWatchers = false;
+            log.info("Persistent Watchers are not available in the version of the ZooKeeper library being used");
+        }
+        hasPersistentWatchers = localHasPersistentWatchers;
     }
 
     public static boolean hasGetReachableOrOneMethod()
@@ -101,4 +115,9 @@ public class Compatibility
         }
         return (address != null) ? address.getAddress().getHostAddress() : "unknown";
     }
+
+    public static boolean hasPersistentWatchers()
+    {
+        return hasPersistentWatchers;
+    }
 }
diff --git a/curator-client/src/test/java/org/apache/curator/TestIs36.java b/curator-client/src/test/java/org/apache/curator/TestIs36.java
index 2bad9b9..6480b6d 100644
--- a/curator-client/src/test/java/org/apache/curator/TestIs36.java
+++ b/curator-client/src/test/java/org/apache/curator/TestIs36.java
@@ -30,6 +30,7 @@ public class TestIs36 extends CuratorTestBase
     {
         Assert.assertTrue(Compatibility.hasGetReachableOrOneMethod());
         Assert.assertTrue(Compatibility.hasAddrField());
+        Assert.assertTrue(Compatibility.hasPersistentWatchers());
     }
 
     @Override
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index a686b94..492a461 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -196,10 +196,20 @@ public interface CuratorFramework extends Closeable
      * Start a remove watches builder.
      *
      * @return builder object
+     * @deprecated use {@link #watchers()} in ZooKeeper 3.6+
      */
     public RemoveWatchesBuilder watches();
 
     /**
+     * Start a watch builder. Supported only when ZooKeeper JAR of version 3.6 or
+     * above is used, throws {@code IllegalStateException} for ZooKeeper JAR 3.5 or below
+     *
+     * @return builder object
+     * @throws IllegalStateException ZooKeeper JAR is 3.5 or below
+     */
+    public WatchesBuilder watchers();
+
+    /**
      * Returns the listenable interface for the Connect State
      *
      * @return listenable
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
similarity index 68%
copy from curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
index c1a18e2..ad6d434 100644
--- a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
@@ -16,19 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.zk35;
+package org.apache.curator.framework.api;
 
-import org.apache.curator.utils.Compatibility;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import org.apache.zookeeper.AddWatchMode;
 
-public class TestIs35
+public interface AddWatchBuilder extends AddWatchBuilder2
 {
-    @Test
-    public void testIsZk35()
-    {
-        Assert.assertFalse(Compatibility.hasGetReachableOrOneMethod());
-        Assert.assertTrue(Compatibility.hasAddrField());
-    }
-}
-
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode to use
+     * @return this
+     */
+    AddWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
similarity index 70%
copy from curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
index c1a18e2..9114c00 100644
--- a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
@@ -16,19 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.zk35;
 
-import org.apache.curator.utils.Compatibility;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+package org.apache.curator.framework.api;
 
-public class TestIs35
+public interface AddWatchBuilder2 extends
+    Backgroundable<AddWatchable<Pathable<Void>>>,
+    AddWatchable<Pathable<Void>>,
+    Pathable<Void>
 {
-    @Test
-    public void testIsZk35()
-    {
-        Assert.assertFalse(Compatibility.hasGetReachableOrOneMethod());
-        Assert.assertTrue(Compatibility.hasAddrField());
-    }
-}
-
+}
\ No newline at end of file
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
similarity index 65%
copy from curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
index c1a18e2..1f0646c 100644
--- a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
@@ -16,19 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.zk35;
 
-import org.apache.curator.utils.Compatibility;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+package org.apache.curator.framework.api;
 
-public class TestIs35
+import org.apache.zookeeper.Watcher;
+
+public interface AddWatchable<T>
 {
-    @Test
-    public void testIsZk35()
-    {
-        Assert.assertFalse(Compatibility.hasGetReachableOrOneMethod());
-        Assert.assertTrue(Compatibility.hasAddrField());
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
 
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
similarity index 70%
copy from curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
index c1a18e2..3cd5528 100644
--- a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
@@ -16,19 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.zk35;
 
-import org.apache.curator.utils.Compatibility;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+package org.apache.curator.framework.api;
 
-public class TestIs35
+/**
+ * Builder to allow watches to be removed 
+ */
+public interface WatchesBuilder extends RemoveWatchesBuilder
 {
-    @Test
-    public void testIsZk35()
-    {
-        Assert.assertFalse(Compatibility.hasGetReachableOrOneMethod());
-        Assert.assertTrue(Compatibility.hasAddrField());
-    }
-}
-
+    /**
+     * Start an add watch operation
+     *
+     * @return builder
+     */
+    AddWatchBuilder add();
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
new file mode 100644
index 0000000..9f119a3
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.AddWatchBuilder2;
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching;
+    private Backgrounding backgrounding = new Backgrounding();
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AddWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+        watching = new Watching(client, true);
+    }
+
+    public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.mode = mode;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background");
+            if ( watching.isWatched() )
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+            else
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        watching.getWatcher(path),
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                if ( watching.isWatched() )
+                {
+                    client.getZooKeeper().addWatch(fixedPath, mode);
+                }
+                else
+                {
+                    client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
+                }
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 2507e33..bfe61bf 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -41,6 +41,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateManager;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
@@ -558,6 +559,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new RemoveWatchesBuilderImpl(this);
     }
 
+    @Override
+    public WatchesBuilder watchers()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "watchers() is not supported in the ZooKeeper library being used. Use watches() instead.");
+        return new WatchesBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
new file mode 100644
index 0000000..4a273c6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.WatchesBuilder;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+
+public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder
+{
+    public WatchesBuilderImpl(CuratorFrameworkImpl client)
+    {
+        super(client);
+    }
+
+    public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding)
+    {
+        super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding);
+    }
+
+    @Override
+    public AddWatchBuilder add()
+    {
+        return new AddWatchBuilderImpl(getClient());
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 258428c..1ff2805 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -56,7 +56,6 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestReconfiguration extends CuratorTestBase
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
index a777e2f..6c27af1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
@@ -33,11 +33,14 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
@@ -610,6 +613,56 @@ public class TestWatchesBuilder extends CuratorTestBase
         }
     }
 
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
     private static class CountDownWatcher implements Watcher {
         private String path;
         private EventType eventType;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
index 061c6ae..bdcb30b 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
@@ -34,7 +34,6 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestWithCluster extends CuratorTestBase
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
index 0085968..9841677 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.server.ServerCnxnFactory;
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
new file mode 100644
index 0000000..5112d41
--- /dev/null
+++ b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework;
+
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.testng.annotations.Test;
+
+public class TestCompatibility extends CuratorTestBase
+{
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailable() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.watchers().add().forPath("/foo");
+        }
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailableAsync()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().forPath("/foo");
+        }
+    }
+}
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java b/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
index c1a18e2..4292bd0 100644
--- a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
+++ b/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
@@ -29,6 +29,7 @@ public class TestIs35
     {
         Assert.assertFalse(Compatibility.hasGetReachableOrOneMethod());
         Assert.assertTrue(Compatibility.hasAddrField());
+        Assert.assertFalse(Compatibility.hasPersistentWatchers());
     }
 }
 
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..0919861 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -112,4 +112,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
      * @return builder object
      */
     AsyncRemoveWatchesBuilder removeWatches();
+
+    /**
+     * Start an add watch builder. Supported only when ZooKeeper JAR of version 3.6 or
+     * above is used, throws {@code IllegalStateException} for ZooKeeper JAR 3.5 or below
+     *
+     * @return builder object
+     * @throws IllegalStateException ZooKeeper JAR is 3.5 or below
+     */
+    AsyncWatchBuilder addWatch();
 }
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
similarity index 60%
copy from curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
index c1a18e2..a5e86ec 100644
--- a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
@@ -16,19 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.zk35;
 
-import org.apache.curator.utils.Compatibility;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+package org.apache.curator.x.async.api;
 
-public class TestIs35
-{
-    @Test
-    public void testIsZk35()
-    {
-        Assert.assertFalse(Compatibility.hasGetReachableOrOneMethod());
-        Assert.assertTrue(Compatibility.hasAddrField());
-    }
-}
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.AddWatchMode;
 
+public interface AsyncWatchBuilder extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
+{
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode
+     * @return this
+     */
+    AsyncWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
similarity index 70%
copy from curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
index c1a18e2..a97cd4b 100644
--- a/curator-test-zk35/src/test/java/org/apache/curator/zk35/TestIs35.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
@@ -16,19 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.zk35;
 
-import org.apache.curator.utils.Compatibility;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+package org.apache.curator.x.async.api;
 
-public class TestIs35
-{
-    @Test
-    public void testIsZk35()
-    {
-        Assert.assertFalse(Compatibility.hasGetReachableOrOneMethod());
-        Assert.assertTrue(Compatibility.hasAddrField());
-    }
-}
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
 
+public interface AsyncWatchBuilder2 extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
+{
+}
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 07c3398..a248c0e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.details;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -26,6 +27,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorMultiTransactionImpl;
 import org.apache.curator.framework.imps.GetACLBuilderImpl;
 import org.apache.curator.framework.imps.SyncBuilderImpl;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
@@ -152,6 +154,13 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncWatchBuilder addWatch()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used.");
+        return new AsyncWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public CuratorFramework unwrap()
     {
         return client;
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
new file mode 100644
index 0000000..a10540e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.AddWatchBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.Watching;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncWatchBuilder;
+import org.apache.curator.x.async.api.AsyncWatchBuilder2;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AsyncWatchBuilder2, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+{
+    private final CuratorFrameworkImpl client;
+    private final Filters filters;
+    private Watching watching;
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+    {
+        this.client = client;
+        this.filters = filters;
+        watching = new Watching(client, true);
+    }
+
+    @Override
+    public AsyncWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncStage<Void> forPath(String path)
+    {
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+        AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode);
+        return safeCall(common.internalCallback, () -> builder.forPath(path));
+    }
+}
\ No newline at end of file
diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
new file mode 100644
index 0000000..bdf006a
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+public class TestAddWatch extends CuratorTestBase
+{
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test").toCompletableFuture().get();
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+}