You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/03 16:16:50 UTC

[curator] branch persistent-watcher-recipe updated (3068468 -> 5593808)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch persistent-watcher-recipe
in repository https://gitbox.apache.org/repos/asf/curator.git.


 discard 3068468  Added support for a PersistentWatcher recipe based on new persistent watch APIs
 discard ecd06e4  Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
     new ed3f517  Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
     new 5593808  Added support for a PersistentWatcher recipe based on new persistent watch APIs

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3068468)
            \
             N -- N -- N   refs/heads/persistent-watcher-recipe (5593808)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/curator/framework/CuratorFramework.java |  4 +-
 .../api/{ACLable.java => AddWatchBuilder.java}     | 14 +++---
 ...PathAndBytesable.java => AddWatchBuilder2.java} |  7 ++-
 ...dPersistentWatchable.java => AddWatchable.java} |  2 +-
 .../curator/framework/api/CuratorEventType.java    |  4 +-
 ...chBuilderImpl.java => AddWatchBuilderImpl.java} | 46 ++++++++++---------
 .../framework/imps/CuratorFrameworkImpl.java       |  4 +-
 .../curator/framework/imps/TestFramework.java      |  9 ++--
 .../framework/recipes/watch/PersistentWatcher.java |  5 +--
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |  4 +-
 .../curator/x/async/api/AsyncWatchBuilder.java     | 52 ++++++++++++++++++++++
 .../x/async/details/AsyncCuratorFrameworkImpl.java |  4 +-
 ...BuilderImpl.java => AsyncWatchBuilderImpl.java} | 37 +++++++++++----
 .../curator/framework/imps/TestFramework.java      | 10 +++--
 14 files changed, 137 insertions(+), 65 deletions(-)
 copy curator-framework/src/main/java/org/apache/curator/framework/api/{ACLable.java => AddWatchBuilder.java} (75%)
 copy curator-framework/src/main/java/org/apache/curator/framework/api/{ACLPathAndBytesable.java => AddWatchBuilder2.java} (87%)
 copy curator-framework/src/main/java/org/apache/curator/framework/api/{AddPersistentWatchable.java => AddWatchable.java} (96%)
 rename curator-framework/src/main/java/org/apache/curator/framework/imps/{AddPersistentWatchBuilderImpl.java => AddWatchBuilderImpl.java} (71%)
 create mode 100644 curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
 copy curator-x-async/src/main/java/org/apache/curator/x/async/details/{AsyncPersistentWatchBuilderImpl.java => AsyncWatchBuilderImpl.java} (58%)


[curator] 01/02: Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-recipe
in repository https://gitbox.apache.org/repos/asf/curator.git

commit ed3f517aa3f1b8d83f1561b9646c38ed9800bdcb
Author: randgalt <ra...@apache.org>
AuthorDate: Wed Oct 2 20:09:32 2019 -0500

    Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
---
 .../curator/utils/DefaultZookeeperFactory.java     |   3 +-
 .../apache/curator/framework/CuratorFramework.java |   6 +
 .../framework/api/AddPersistentWatchBuilder.java   |  21 ++-
 .../framework/api/AddPersistentWatchBuilder2.java  |  16 +-
 .../framework/api/AddPersistentWatchable.java      |  27 ++--
 .../curator/framework/api/AddWatchBuilder.java     |  21 +--
 .../curator/framework/api/AddWatchBuilder2.java    |  15 +-
 .../apache/curator/framework/api/AddWatchable.java |  27 ++--
 .../curator/framework/api/CuratorEventType.java    |   7 +-
 .../imps/AddPersistentWatchBuilderImpl.java        | 169 ++++++++++++++++++++
 .../framework/imps/AddWatchBuilderImpl.java        | 171 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |   7 +
 .../imps/CuratorMultiTransactionRecord.java        |  34 ++--
 .../framework/imps/ReconfigBuilderImpl.java        |  18 ++-
 .../apache/curator/framework/imps/Watching.java    |  10 +-
 .../curator/framework/imps/TestFramework.java      |  51 ++++++
 .../recipes/leader/ChaosMonkeyCnxnFactory.java     |   7 +-
 curator-test/pom.xml                               |  10 ++
 .../apache/curator/test/TestingQuorumPeerMain.java |   3 +-
 .../apache/curator/test/TestingZooKeeperMain.java  |   5 +-
 .../org/apache/curator/test/WatchersDebug.java     |   9 ++
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |   7 +
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  33 ++++
 .../curator/x/async/api/AsyncWatchBuilder.java     |  52 +++++++
 .../x/async/details/AsyncCuratorFrameworkImpl.java |   6 +
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 +++++++++
 .../x/async/details/AsyncWatchBuilderImpl.java     |  94 +++++++++++
 .../curator/framework/imps/TestFramework.java      |  59 +++++++
 pom.xml                                            |  23 ++-
 29 files changed, 895 insertions(+), 91 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
index 42279d0..acd32e7 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
@@ -20,12 +20,13 @@ package org.apache.curator.utils;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 
 public class DefaultZookeeperFactory implements ZookeeperFactory
 {
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
     {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
     }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..77efeb8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -199,6 +199,12 @@ public interface CuratorFramework extends Closeable
     public RemoveWatchesBuilder watches();
 
     /**
+     * Start an add watch builder
+     *
+     * @return builder object
+     */
+    public AddWatchBuilder addWatch();
+    /**
      * Returns the listenable interface for the Connect State
      *
      * @return listenable
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
similarity index 65%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
index 42279d0..a167174 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
@@ -16,16 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * ZooKeeper persistent watches can optionally be recursive. See
+     * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+     *
+     * @return this
+     */
+    AddPersistentWatchBuilder2 recursive();
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
similarity index 65%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
index 42279d0..15cea4f 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
@@ -16,16 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddPersistentWatchBuilder2 extends
+    Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
+    AddPersistentWatchable<Pathable<Void>>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
similarity index 68%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
index 42279d0..faa8906 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
@@ -16,16 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddPersistentWatchable<T>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
similarity index 65%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
index 42279d0..ad6d434 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AddWatchMode;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchBuilder extends AddWatchBuilder2
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode to use
+     * @return this
+     */
+    AddWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
similarity index 65%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
index 42279d0..b784fc5 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
@@ -16,16 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchBuilder2 extends
+    Backgroundable<AddWatchable<Pathable<Void>>>, AddWatchable<Pathable<Void>>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
similarity index 68%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
index 42279d0..dbd2666 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
@@ -16,16 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchable<T>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..22cc181 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
     /**
      * Event sent when client is being closed
      */
-    CLOSING
+    CLOSING,
+
+    /**
+     * Corresponds to {@link CuratorFramework#addWatch()}
+     */
+    ADD_WATCH
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..acb70c8
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching = null;
+    private Backgrounding backgrounding = new Backgrounding();
+    private boolean recursive = false;
+
+    AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.recursive = recursive;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchBuilder2 recursive()
+    {
+        recursive = true;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background");
+            client.getZooKeeper().addPersistentWatch
+                (
+                    fixedPath,
+                    watching.getWatcher(path),
+                    recursive, (rc, path1, ctx) -> {
+                        trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                        CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                        client.processBackgroundOperation(data, event);
+                    },
+                    backgrounding.getContext()
+                );
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive);
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
new file mode 100644
index 0000000..64e1de5
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.AddWatchBuilder2;
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching = null;
+    private Backgrounding backgrounding = new Backgrounding();
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AddWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.mode = mode;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background");
+            client.getZooKeeper().addWatch
+                (
+                    fixedPath,
+                    watching.getWatcher(path),
+                    mode,
+                    (rc, path1, ctx) -> {
+                        trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                        CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                        client.processBackgroundOperation(data, event);
+                    },
+                    backgrounding.getContext()
+                );
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..8b3294a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -571,6 +571,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new RemoveWatchesBuilderImpl(this);
     }
 
+    @Override
+    public AddWatchBuilder addWatch()
+    {
+        Preconditions.checkState(!isZk34CompatibilityMode(), "Persistent watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
+        return new AddWatchBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 3e72609..fbac6e6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -16,49 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.transaction.OperationType;
 import org.apache.curator.framework.api.transaction.TypeAndPath;
-import org.apache.zookeeper.MultiTransactionRecord;
 import org.apache.zookeeper.Op;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
-class CuratorMultiTransactionRecord extends MultiTransactionRecord
+class CuratorMultiTransactionRecord implements Iterable<Op>
 {
-    private final List<TypeAndPath>     metadata = Lists.newArrayList();
-
-    @Override
-    public final void add(Op op)
-    {
-        throw new UnsupportedOperationException();
-    }
+    private final List<TypeAndPath> metadata = Lists.newArrayList();
+    private List<Op> ops = new ArrayList<>();
 
     void add(Op op, OperationType type, String forPath)
     {
-        super.add(op);
+        ops.add(op);
         metadata.add(new TypeAndPath(type, forPath));
     }
 
-    TypeAndPath     getMetadata(int index)
+    TypeAndPath getMetadata(int index)
     {
         return metadata.get(index);
     }
 
-    int             metadataSize()
+    int metadataSize()
     {
         return metadata.size();
     }
 
     void addToDigest(MessageDigest digest)
     {
-        for ( Op op : this )
+        for ( Op op : ops )
         {
             digest.update(op.getPath().getBytes());
             digest.update(Integer.toString(op.getType()).getBytes());
             digest.update(op.toRequestRecord().toString().getBytes());
         }
     }
+
+    @Override
+    public Iterator<Op> iterator()
+    {
+        return ops.iterator();
+    }
+
+    int size()
+    {
+        return ops.size();
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 97be59a..f8b78da 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -24,6 +24,8 @@ import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import java.util.Arrays;
@@ -268,7 +270,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     client.processBackgroundOperation(data, event);
                 }
             };
-            client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
+            admin().reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
         }
         catch ( Throwable e )
         {
@@ -276,6 +278,18 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
         }
     }
 
+    private ZooKeeperAdmin admin() throws Exception
+    {
+        try
+        {
+            return (ZooKeeperAdmin)client.getZooKeeper();
+        }
+        catch ( ClassCastException e )
+        {
+            throw new Exception("ZooKeeper instance is not an instance of ZooKeeperAdmin");
+        }
+    }
+
     private byte[] ensembleInForeground() throws Exception
     {
         TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground");
@@ -287,7 +301,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     @Override
                     public byte[] call() throws Exception
                     {
-                        return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat);
+                        return admin().reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
                     }
                 }
             );
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..9486286 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -37,6 +37,7 @@ import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -1265,4 +1266,54 @@ public class TestFramework extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            client.addWatch().usingWatcher(watcher).forPath("/top/main");
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            client.addWatch().withMode(AddWatchMode.PERSISTENT).usingWatcher(watcher).forPath("/top/main");
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 4cb342c..f3dda40 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +93,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                 log.debug("Rejected : " + si.toString());
                 // Still reject request
                 log.debug("Still not ready for " + remaining + "ms");
-                ((NIOServerCnxn)si.cnxn).close();
+                ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                 return;
             }
             // Submit the request to the legacy Zookeeper server
@@ -113,13 +114,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                         firstError = System.currentTimeMillis();
                         // The znode has been created, close the connection and don't tell it to client
                         log.warn("Closing connection right after " + createRequest.getPath() + " creation");
-                        ((NIOServerCnxn)si.cnxn).close();
+                        ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                     }
                 }
                 catch ( Exception e )
                 {
                     // Should not happen
-                    ((NIOServerCnxn)si.cnxn).close();
+                    ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                 }
             }
         }
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 3683b7d..4f0c5a2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -41,6 +41,16 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
index 3b3ab26..4baaff9 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.test;
 
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
@@ -39,7 +40,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
                 Field               cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory");
                 cnxnFactoryField.setAccessible(true);
                 ServerCnxnFactory   cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer);
-                cnxnFactory.closeAll();
+                cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN);
 
                 Field               ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index 841df77..8e5ffee 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@ -23,6 +23,7 @@ import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
 import org.apache.zookeeper.server.ContainerManager;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZKDatabase;
@@ -81,7 +82,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
         {
             if ( cnxnFactory != null )
             {
-                cnxnFactory.closeAll();
+                cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN);
 
                 Field ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
@@ -262,7 +263,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
     {
         public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
         {
-            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null);
+            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), config.getClientPortListenBacklog(), new ZKDatabase(txnLog), "");
         }
 
         private final AtomicBoolean isRunning = new AtomicBoolean(false);
diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
index e4c3b7e..e884b8c 100644
--- a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
+++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
@@ -27,16 +27,19 @@ public class WatchersDebug
     private static final Method getDataWatches;
     private static final Method getExistWatches;
     private static final Method getChildWatches;
+    private static final Method getPersistentWatches;
     static
     {
         Method localGetDataWatches = null;
         Method localGetExistWatches = null;
         Method localGetChildWatches = null;
+        Method localGetPersistentWatches = null;
         try
         {
             localGetDataWatches = getMethod("getDataWatches");
             localGetExistWatches = getMethod("getExistWatches");
             localGetChildWatches = getMethod("getChildWatches");
+            localGetPersistentWatches = getMethod("getPersistentWatches");
         }
         catch ( NoSuchMethodException e )
         {
@@ -45,6 +48,7 @@ public class WatchersDebug
         getDataWatches = localGetDataWatches;
         getExistWatches = localGetExistWatches;
         getChildWatches = localGetChildWatches;
+        getPersistentWatches = localGetPersistentWatches;
     }
 
     public static List<String> getDataWatches(ZooKeeper zooKeeper)
@@ -62,6 +66,11 @@ public class WatchersDebug
         return callMethod(zooKeeper, getChildWatches);
     }
 
+    public static List<String> getPersistentWatches(ZooKeeper zooKeeper)
+    {
+        return callMethod(zooKeeper, getPersistentWatches);
+    }
+
     private WatchersDebug()
     {
     }
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..d70c02c 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
     AsyncReconfigBuilder reconfig();
 
     /**
+     * Start an add watch builder
+     *
+     * @return builder object
+     */
+    AsyncWatchBuilder addWatch();
+
+    /**
      * Start a transaction builder
      *
      * @return builder object
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
new file mode 100644
index 0000000..0f29233
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -0,0 +1,33 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.api;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.x.async.AsyncStage;
+
+ public interface AsyncPersistentWatchBuilder extends AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>
+ {
+     /**
+      * ZooKeeper persistent watches can optionally be recursive. See
+      * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+      *
+      * @return this
+      */
+     AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive();
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
new file mode 100644
index 0000000..f782a4c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.api;
+
+ import org.apache.curator.framework.api.AddWatchable;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.zookeeper.AddWatchMode;
+
+public interface AsyncWatchBuilder extends AddWatchable<AsyncPathable<AsyncStage<Void>>>
+ {
+     /**
+      * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+      *
+      * @param mode mode
+      * @return this
+      */
+     AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode);
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..41ddee2 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncWatchBuilder addWatch()
+    {
+        return new AsyncWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public AsyncMultiTransaction transaction()
     {
         return operations -> {
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..14f3e30
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
@@ -0,0 +1,75 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.details;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
+ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+ import org.apache.curator.framework.imps.Watching;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.curator.x.async.api.AsyncPathable;
+ import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
+ import org.apache.zookeeper.Watcher;
+
+ import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+ class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+ {
+     private final CuratorFrameworkImpl client;
+     private final Filters filters;
+     private Watching watching = null;
+     private boolean recursive = false;
+
+     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+     {
+         this.client = client;
+         this.filters = filters;
+     }
+
+     @Override
+     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
+     {
+         recursive = true;
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncStage<Void> forPath(String path)
+     {
+         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+         AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
+         return safeCall(common.internalCallback, () -> builder.forPath(path));
+     }
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
new file mode 100644
index 0000000..3092f51
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.details;
+
+ import org.apache.curator.framework.api.AddWatchable;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.imps.AddWatchBuilderImpl;
+ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+ import org.apache.curator.framework.imps.Watching;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.curator.x.async.api.AsyncPathable;
+ import org.apache.curator.x.async.api.AsyncWatchBuilder;
+ import org.apache.zookeeper.AddWatchMode;
+ import org.apache.zookeeper.Watcher;
+
+ import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+ class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+ {
+     private final CuratorFrameworkImpl client;
+     private final Filters filters;
+     private Watching watching = null;
+     private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+     AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+     {
+         this.client = client;
+         this.filters = filters;
+     }
+
+     @Override
+     public AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode)
+     {
+         this.mode = mode;
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncStage<Void> forPath(String path)
+     {
+         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+         AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode);
+         return safeCall(common.internalCallback, () -> builder.forPath(path));
+     }
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index 27a84d0..f2b51fe 100644
--- a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -30,6 +30,8 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -37,8 +39,11 @@ import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.api.DeleteOption;
 import org.apache.curator.x.async.api.ExistsOption;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.testng.Assert;
@@ -657,4 +662,58 @@ public class TestFramework extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            async.addWatch().usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get();
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    public void testPersistentWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            async.addWatch().withMode(AddWatchMode.PERSISTENT).usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get();
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git a/pom.xml b/pom.xml
index 3a5e152..27b34cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
         <jdk-version>1.${short-jdk-version}</jdk-version>
 
         <!-- versions -->
-        <zookeeper-version>3.5.5</zookeeper-version>
+        <zookeeper-version>3.6.0-SNAPSHOT</zookeeper-version>
         <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
         <doxia-module-confluence-version>1.8</doxia-module-confluence-version>
@@ -85,10 +85,11 @@
         <guava-failureaccess-version>1.0.1</guava-failureaccess-version>
         <testng-version>6.14.3</testng-version>
         <swift-version>0.23.1</swift-version>
-        <dropwizard-version>1.3.7</dropwizard-version>
         <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version>
         <slf4j-version>1.7.25</slf4j-version>
         <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version>
+        <dropwizard-version>3.2.5</dropwizard-version>
+        <snappy-version>1.1.7</snappy-version>
 
         <!-- OSGi Properties -->
         <osgi.export.package />
@@ -567,6 +568,24 @@
                 <artifactId>dropwizard-logging</artifactId>
                 <version>${dropwizard-version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard-version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>${snappy-version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 


[curator] 02/02: Added support for a PersistentWatcher recipe based on new persistent watch APIs

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-recipe
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 55938084e46950578b2466b7dfe689dd2a77d3c4
Author: randgalt <ra...@apache.org>
AuthorDate: Wed Oct 2 23:24:14 2019 -0500

    Added support for a PersistentWatcher recipe based on new persistent watch APIs
---
 .../imps/AddPersistentWatchBuilderImpl.java        | 169 ---------------------
 .../framework/recipes/watch/PersistentWatcher.java | 134 ++++++++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  86 +++++++++++
 3 files changed, 220 insertions(+), 169 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
deleted file mode 100644
index acb70c8..0000000
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.curator.RetryLoop;
-import org.apache.curator.drivers.OperationTrace;
-import org.apache.curator.framework.api.AddPersistentWatchBuilder;
-import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
-import org.apache.curator.framework.api.AddPersistentWatchable;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.api.Pathable;
-import org.apache.zookeeper.Watcher;
-import java.util.concurrent.Executor;
-
-public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
-{
-    private final CuratorFrameworkImpl client;
-    private Watching watching = null;
-    private Backgrounding backgrounding = new Backgrounding();
-    private boolean recursive = false;
-
-    AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
-    {
-        this.client = client;
-    }
-
-    public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive)
-    {
-        this.client = client;
-        this.watching = watching;
-        this.backgrounding = backgrounding;
-        this.recursive = recursive;
-    }
-
-    @Override
-    public AddPersistentWatchable<Pathable<Void>> inBackground()
-    {
-        backgrounding = new Backgrounding();
-        return this;
-    }
-
-    @Override
-    public AddPersistentWatchBuilder2 recursive()
-    {
-        recursive = true;
-        return this;
-    }
-
-    @Override
-    public Pathable<Void> usingWatcher(Watcher watcher)
-    {
-        watching = new Watching(client, watcher);
-        return this;
-    }
-
-    @Override
-    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
-    {
-        watching = new Watching(client, watcher);
-        return this;
-    }
-
-    @Override
-    public AddPersistentWatchable<Pathable<Void>> inBackground(Object context)
-    {
-        backgrounding = new Backgrounding(context);
-        return this;
-    }
-
-    @Override
-    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
-    {
-        backgrounding = new Backgrounding(callback);
-        return this;
-    }
-
-    @Override
-    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
-    {
-        backgrounding = new Backgrounding(callback, context);
-        return this;
-    }
-
-    @Override
-    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
-    {
-        backgrounding = new Backgrounding(callback, executor);
-        return this;
-    }
-
-    @Override
-    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
-    {
-        backgrounding = new Backgrounding(client, callback, context, executor);
-        return this;
-    }
-
-    @Override
-    public Void forPath(String path) throws Exception
-    {
-        if ( backgrounding.inBackground() )
-        {
-            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
-        }
-        else
-        {
-            pathInForeground(path);
-        }
-        return null;
-    }
-
-    @Override
-    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
-    {
-        String path = data.getData();
-        String fixedPath = client.fixForNamespace(path);
-        try
-        {
-            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background");
-            client.getZooKeeper().addPersistentWatch
-                (
-                    fixedPath,
-                    watching.getWatcher(path),
-                    recursive, (rc, path1, ctx) -> {
-                        trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
-                        CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
-                        client.processBackgroundOperation(data, event);
-                    },
-                    backgrounding.getContext()
-                );
-        }
-        catch ( Throwable e )
-        {
-            backgrounding.checkError(e, watching);
-        }
-    }
-
-    private void pathInForeground(final String path) throws Exception
-    {
-        final String fixedPath = client.fixForNamespace(path);
-        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground");
-        RetryLoop.callWithRetry
-        (
-            client.getZookeeperClient(), () -> {
-                client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive);
-                return null;
-            });
-        trace.setPath(fixedPath).setWithWatcher(true).commit();
-    }
-}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..2c97490
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,134 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+  * A managed persistent watcher. The watch will be managed such that it stays set through
+  * connection lapses, etc.
+  */
+ public class PersistentWatcher implements Closeable
+ {
+     private final Logger log = LoggerFactory.getLogger(getClass());
+     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+     private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+     private final ConnectionStateListener connectionStateListener = (client, newState) -> {
+         if ( newState.isConnected() )
+         {
+             reset();
+         }
+     };
+     private final Watcher watcher = event -> listeners.forEach(w -> w.process(event));
+     private final CuratorFramework client;
+     private final String basePath;
+     private final boolean recursive;
+
+     private enum State
+     {
+         LATENT,
+         STARTED,
+         CLOSED
+     }
+
+     /**
+      * @param client client
+      * @param basePath path to set the watch on
+      * @param recursive ZooKeeper persistent watches can optionally be recursive
+      */
+     public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
+     {
+         this.client = Objects.requireNonNull(client, "client cannot be null");
+         this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+         this.recursive = recursive;
+     }
+
+     /**
+      * Start watching
+      */
+     public void start()
+     {
+         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+         client.getConnectionStateListenable().addListener(connectionStateListener);
+         reset();
+     }
+
+     /**
+      * Remove the watcher
+      */
+     @Override
+     public void close()
+     {
+         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+         {
+             listeners.clear();
+             client.getConnectionStateListenable().removeListener(connectionStateListener);
+             try
+             {
+                 client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+             }
+             catch ( Exception e )
+             {
+                 ThreadUtils.checkInterrupted(e);
+                 log.debug(String.format("Could not remove watcher for path: %s", basePath), e);
+             }
+         }
+     }
+
+     /**
+      * Container for setting listeners
+      *
+      * @return listener container
+      */
+     public Listenable<Watcher> getListenable()
+     {
+         return listeners;
+     }
+
+     private void reset()
+     {
+         try
+         {
+             BackgroundCallback callback = (__, event) -> {
+                 if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) {
+                     client.runSafe(this::reset);
+                 }
+             };
+             client.addWatch().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
+         }
+         catch ( Exception e )
+         {
+             log.error("Could not reset persistent watch at path: " + basePath, e);
+         }
+     }
+ }
\ No newline at end of file
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
new file mode 100644
index 0000000..df18de5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -0,0 +1,86 @@
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPersistentWatcher extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testConnectionLostRecursive() throws Exception
+    {
+        internalTest(true);
+    }
+
+    @Test
+    public void testConnectionLost() throws Exception
+    {
+        internalTest(false);
+    }
+
+    private void internalTest(boolean recursive) throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            client.start();
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST ) {
+                    lostLatch.countDown();
+                } else if ( newState == ConnectionState.RECONNECTED ) {
+                    reconnectedLatch.countDown();
+                }
+            });
+
+            try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) )
+            {
+                persistentWatcher.start();
+
+                BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+                persistentWatcher.getListenable().addListener(events::add);
+
+                client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+                if ( recursive )
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+                }
+                else
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");   // child added
+                }
+
+                server.stop();
+                Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected);
+                Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+                server.restart();
+                Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+
+                timing.sleepABit();     // time to allow watcher to get reset
+                events.clear();
+
+                if ( recursive )
+                {
+                    client.setData().forPath("/top/main/a", "foo".getBytes());
+                    Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+                }
+                client.setData().forPath("/top/main", "bar".getBytes());
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            }
+        }
+    }
+}