You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 18:25:39 UTC

[curator] branch zk36-recipes updated (7a2e002 -> 886f9ff)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch zk36-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git.


 discard 7a2e002  Add recipes that use persistent watchers including new cache recipes to replace all others.
    omit c96a964  Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x
     new 931ec04  Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x
     new 886f9ff  Add recipes that use persistent watchers including new cache recipes to replace all others.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7a2e002)
            \
             N -- N -- N   refs/heads/zk36-recipes (886f9ff)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/curator/framework/CuratorFramework.java |  7 ++++
 .../framework/imps/CuratorFrameworkImpl.java       |  6 +++
 .../curator/framework/imps/TestAddWatch.java       | 18 +++++++++
 src/site/confluence/zk-compatibility.confluence    | 46 ++++------------------
 4 files changed, 39 insertions(+), 38 deletions(-)


[curator] 01/02: Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch zk36-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 931ec045b775a8c9c0614896a131aefb173b0eb9
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500

    Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x
---
 .../org/apache/curator/utils/Compatibility.java    |  35 ++++
 .../curator/utils/DefaultZookeeperFactory.java     |  17 ++
 .../apache/curator/framework/CuratorFramework.java |  16 ++
 .../curator/framework/api/AddWatchBuilder.java     |  18 +-
 .../curator/framework/api/AddWatchBuilder2.java    |  14 +-
 .../apache/curator/framework/api/AddWatchable.java |  28 ++-
 .../curator/framework/api/CuratorEventType.java    |   7 +-
 .../curator/framework/api/WatchesBuilder.java      |  20 ++-
 .../framework/imps/AddWatchBuilderImpl.java        | 197 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |  23 +++
 .../imps/CuratorMultiTransactionRecord.java        |  34 ++--
 .../framework/imps/ReconfigBuilderImpl.java        |   5 +-
 .../framework/imps/RemoveWatchesBuilderImpl.java   |   9 +-
 .../curator/framework/imps/WatchesBuilderImpl.java |  26 ++-
 .../apache/curator/framework/imps/Watching.java    |  10 +-
 .../framework/imps/TestCreateReturningStat.java    |   3 +-
 .../curator/framework/imps/TestFramework.java      |   9 +-
 .../curator/framework/imps/TestFrameworkEdges.java |   4 +-
 .../framework/imps/TestReconfiguration.java        |   3 +-
 .../curator/framework/imps/TestTtlNodes.java       |   3 +-
 .../framework/imps/TestWatcherRemovalManager.java  |   3 +-
 ...tRemoveWatches.java => TestWatchesBuilder.java} |  64 ++++++-
 .../curator/framework/imps/TestWithCluster.java    |   2 +
 .../state/TestConnectionStateManager.java          |   2 +
 .../framework/recipes/cache/BaseTestTreeCache.java |   4 +-
 .../framework/recipes/cache/TestNodeCache.java     |   2 +
 .../recipes/cache/TestPathChildrenCache.java       |   2 +
 .../cache/TestPathChildrenCacheInCluster.java      |   2 +
 .../framework/recipes/cache/TestTreeCache.java     |   2 +
 .../recipes/leader/ChaosMonkeyCnxnFactory.java     |   8 +-
 .../framework/recipes/leader/TestLeaderLatch.java  |   2 +
 .../locks/TestInterProcessSemaphoreCluster.java    |   2 +
 .../recipes/nodes/TestPersistentTtlNode.java       |   3 +-
 curator-test-zk34/pom.xml                          |   2 +-
 {curator-test-zk34 => curator-test-zk35}/pom.xml   | 112 +++++-------
 .../curator/framework/TestCompatibility.java       |  49 +++++
 .../src/test/resources/log4j.properties            |  27 +++
 curator-test/pom.xml                               |  10 ++
 .../org/apache/curator/test/BaseClassForTests.java |   4 +-
 .../org/apache/curator/test/Compatibility.java     |  93 ++++++++++
 .../org/apache/curator/test/TestingCluster.java    |   2 +-
 .../apache/curator/test/TestingQuorumPeerMain.java |   2 +-
 .../apache/curator/test/TestingZooKeeperMain.java  |   6 +-
 .../test/compatibility/CuratorTestBase.java        |   5 +-
 .../test/compatibility/Zk35MethodInterceptor.java  |  56 ------
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |   7 +
 .../curator/x/async/api/AsyncWatchBuilder.java     |  26 +--
 .../curator/x/async/api/AsyncWatchBuilder2.java    |  16 +-
 .../x/async/details/AsyncCuratorFrameworkImpl.java |  23 ++-
 .../x/async/details/AsyncWatchBuilderImpl.java     |  79 +++++++++
 .../curator/framework/imps/TestAddWatch.java       |  87 +++++++++
 pom.xml                                            |  45 +++--
 src/site/confluence/zk-compatibility.confluence    |  25 +--
 53 files changed, 991 insertions(+), 264 deletions(-)

diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
index 1ee2301..fd98d88 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
@@ -31,7 +31,9 @@ import java.lang.reflect.Method;
 public class Compatibility
 {
     private static final boolean hasZooKeeperAdmin;
+    private static final boolean hasPersistentWatchers;
     private static final Method queueEventMethod;
+
     private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
 
     static
@@ -49,6 +51,19 @@ public class Compatibility
         }
         hasZooKeeperAdmin = localHasZooKeeperAdmin;
 
+        boolean localHasPersistentWatchers;
+        try
+        {
+            Class.forName("org.apache.zookeeper.AddWatchMode");
+            localHasPersistentWatchers = true;
+        }
+        catch ( ClassNotFoundException e )
+        {
+            localHasPersistentWatchers = false;
+            logger.info("Persistent Watchers are not available in the version of the ZooKeeper library being used");
+        }
+        hasPersistentWatchers = localHasPersistentWatchers;
+
         Method localQueueEventMethod;
         try
         {
@@ -74,6 +89,26 @@ public class Compatibility
     }
 
     /**
+     * Return true if the ZooKeeperAdmin class is available
+     *
+     * @return true/false
+     */
+    public static boolean hasZooKeeperAdmin()
+    {
+        return hasZooKeeperAdmin;
+    }
+
+    /**
+     * Return true if persistent watchers are available
+     *
+     * @return true/false
+     */
+    public static boolean hasPersistentWatchers()
+    {
+        return hasPersistentWatchers;
+    }
+
+    /**
      * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>.
      * For ZooKeeper 3.4.x do the equivalent via reflection
      *
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
index 42279d0..f936518 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
@@ -20,12 +20,29 @@ package org.apache.curator.utils;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 
 public class DefaultZookeeperFactory implements ZookeeperFactory
 {
+    // hide org.apache.zookeeper.admin.ZooKeeperAdmin in a nested class so that Curator continues to work with ZK 3.4.x
+    private static class ZooKeeperAdminMaker implements ZookeeperFactory
+    {
+        static final ZooKeeperAdminMaker instance = new ZooKeeperAdminMaker();
+
+        @Override
+        public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
+        {
+            return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
+        }
+    }
+
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
     {
+        if ( Compatibility.hasZooKeeperAdmin() )
+        {
+            return ZooKeeperAdminMaker.instance.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        }
         return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
     }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..d279373 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -194,11 +194,20 @@ public interface CuratorFramework extends Closeable
 
     /**
      * Start a remove watches builder.
+     *
      * @return builder object
+     * @deprecated use {@link #watchers()} in ZooKeeper 3.6+
      */
     public RemoveWatchesBuilder watches();
 
     /**
+     * Start a watches builder.
+     *
+     * @return builder object
+     */
+    public WatchesBuilder watchers();
+
+    /**
      * Returns the listenable interface for the Connect State
      *
      * @return listenable
@@ -332,6 +341,13 @@ public interface CuratorFramework extends Closeable
     boolean isZk34CompatibilityMode();
 
     /**
+     * Return true if this instance is running in ZK 3.5.x compatibility mode
+     *
+     * @return true/false
+     */
+    boolean isZk35CompatibilityMode();
+
+    /**
      * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
      * done from the {@link #runSafe(Runnable)} thread.
      *
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
similarity index 68%
copy from curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
index a3c2a29..ad6d434 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
@@ -16,13 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test.compatibility;
+package org.apache.curator.framework.api;
 
-import org.apache.curator.test.BaseClassForTests;
-import org.testng.annotations.Listeners;
+import org.apache.zookeeper.AddWatchMode;
 
-@Listeners(Zk35MethodInterceptor.class)
-public class CuratorTestBase extends BaseClassForTests
+public interface AddWatchBuilder extends AddWatchBuilder2
 {
-    protected final Timing2 timing = new Timing2();
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode to use
+     * @return this
+     */
+    AddWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
similarity index 81%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
index 5b4b53f..9114c00 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.framework.api;
+
+public interface AddWatchBuilder2 extends
+    Backgroundable<AddWatchable<Pathable<Void>>>,
+    AddWatchable<Pathable<Void>>,
+    Pathable<Void>
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
similarity index 68%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
index 42279d0..1f0646c 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
@@ -16,16 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+
+package org.apache.curator.framework.api;
 
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddWatchable<T>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..89e1490 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
     /**
      * Event sent when client is being closed
      */
-    CLOSING
+    CLOSING,
+
+    /**
+     * Corresponds to {@link org.apache.curator.framework.CuratorFramework#watchers()}
+     */
+    ADD_WATCH
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
similarity index 75%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
index 5b4b53f..3cd5528 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
@@ -16,12 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow watches to be removed 
+ */
+public interface WatchesBuilder extends RemoveWatchesBuilder
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+    /**
+     * Start an add watch operation
+     *
+     * @return builder
+     */
+    AddWatchBuilder add();
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
new file mode 100644
index 0000000..9f119a3
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.AddWatchBuilder2;
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching;
+    private Backgrounding backgrounding = new Backgrounding();
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AddWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+        watching = new Watching(client, true);
+    }
+
+    public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.mode = mode;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background");
+            if ( watching.isWatched() )
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+            else
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        watching.getWatcher(path),
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                if ( watching.isWatched() )
+                {
+                    client.getZooKeeper().addWatch(fixedPath, mode);
+                }
+                else
+                {
+                    client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
+                }
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..5afc5ce 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -42,6 +42,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateManager;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
@@ -571,6 +572,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new RemoveWatchesBuilderImpl(this);
     }
 
+    @Override
+    public WatchesBuilder watchers()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used. Use watches() instead.");
+        return new WatchesBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
@@ -620,6 +628,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return client.getZooKeeper();
     }
 
+    Object getZooKeeperAdmin() throws Exception
+    {
+        if ( isZk34CompatibilityMode() )
+        {
+            Preconditions.checkState(!isZk34CompatibilityMode(), "getZooKeeperAdmin() is not supported when running in ZooKeeper 3.4 compatibility mode");
+        }
+        return client.getZooKeeper();
+    }
+
     CompressionProvider getCompressionProvider()
     {
         return compressionProvider;
@@ -830,6 +847,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return zk34CompatibilityMode;
     }
 
+    @Override
+    public boolean isZk35CompatibilityMode()
+    {
+        return !zk34CompatibilityMode && !Compatibility.hasPersistentWatchers();
+    }
+
     EnsembleTracker getEnsembleTracker()
     {
         return ensembleTracker;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 3e72609..fbac6e6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -16,49 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.transaction.OperationType;
 import org.apache.curator.framework.api.transaction.TypeAndPath;
-import org.apache.zookeeper.MultiTransactionRecord;
 import org.apache.zookeeper.Op;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
-class CuratorMultiTransactionRecord extends MultiTransactionRecord
+class CuratorMultiTransactionRecord implements Iterable<Op>
 {
-    private final List<TypeAndPath>     metadata = Lists.newArrayList();
-
-    @Override
-    public final void add(Op op)
-    {
-        throw new UnsupportedOperationException();
-    }
+    private final List<TypeAndPath> metadata = Lists.newArrayList();
+    private List<Op> ops = new ArrayList<>();
 
     void add(Op op, OperationType type, String forPath)
     {
-        super.add(op);
+        ops.add(op);
         metadata.add(new TypeAndPath(type, forPath));
     }
 
-    TypeAndPath     getMetadata(int index)
+    TypeAndPath getMetadata(int index)
     {
         return metadata.get(index);
     }
 
-    int             metadataSize()
+    int metadataSize()
     {
         return metadata.size();
     }
 
     void addToDigest(MessageDigest digest)
     {
-        for ( Op op : this )
+        for ( Op op : ops )
         {
             digest.update(op.getPath().getBytes());
             digest.update(Integer.toString(op.getType()).getBytes());
             digest.update(op.toRequestRecord().toString().getBytes());
         }
     }
+
+    @Override
+    public Iterator<Op> iterator()
+    {
+        return ops.iterator();
+    }
+
+    int size()
+    {
+        return ops.size();
+    }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 97be59a..9f129ca 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -24,6 +24,7 @@ import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import java.util.Arrays;
@@ -268,7 +269,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     client.processBackgroundOperation(data, event);
                 }
             };
-            client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
+            ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
         }
         catch ( Throwable e )
         {
@@ -287,7 +288,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     @Override
                     public byte[] call() throws Exception
                     {
-                        return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat);
+                        return ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
                     }
                 }
             );
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index e14deff..961d5f0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -201,8 +201,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
         }        
         
         return null;
-    }    
-    
+    }
+
+    protected CuratorFrameworkImpl getClient()
+    {
+        return client;
+    }
+
     private void pathInBackground(final String path)
     {
         OperationAndData.ErrorCallback<String>  errorCallback = null;
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
similarity index 50%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
index 42279d0..4a273c6 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
@@ -16,16 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
 
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.WatchesBuilder;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.WatcherType;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder
 {
+    public WatchesBuilderImpl(CuratorFrameworkImpl client)
+    {
+        super(client);
+    }
+
+    public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding)
+    {
+        super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding);
+    }
+
     @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
+    public AddWatchBuilder add()
     {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+        return new AddWatchBuilderImpl(getClient());
     }
-}
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
index 034791d..5a52cf0 100755
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -34,7 +33,7 @@ import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestCreateReturningStat extends CuratorTestBase
 {
     private CuratorFramework createClient()
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..97c8a33 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -31,8 +31,8 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ZKPaths;
@@ -59,9 +59,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestFramework extends BaseClassForTests
 {
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -69,7 +70,7 @@ public class TestFramework extends BaseClassForTests
         super.setup();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
@@ -77,7 +78,7 @@ public class TestFramework extends BaseClassForTests
         super.teardown();
     }
 
-    @Test(groups = Zk35MethodInterceptor.zk35Group)
+    @Test(groups = CuratorTestBase.zk35Group)
     public void testWaitForShutdownTimeoutMs() throws Exception
     {
         final BlockingQueue<Integer> timeoutQueue = new ArrayBlockingQueue<>(1);
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 7c6d156..f89fff9 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -40,13 +40,13 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -63,9 +63,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestFrameworkEdges extends BaseClassForTests
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index c6ff2bb..74b52fa 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -32,7 +32,6 @@ import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingZooKeeperServer;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -57,7 +56,7 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = {CuratorTestBase.zk35Group, CuratorTestBase.zk35CompatibilityGroup})
 public class TestReconfiguration extends CuratorTestBase
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
index e2156df..6c5026a 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java
@@ -25,7 +25,6 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -34,7 +33,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestTtlNodes extends CuratorTestBase
 {
     @BeforeClass
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
index 74aac1d..8968e3e 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.WatchersDebug;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -37,7 +36,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestWatcherRemovalManager extends CuratorTestBase
 {
     @Test
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
similarity index 88%
rename from curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
rename to curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
index 63d8931..26c41f1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
@@ -30,23 +30,25 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
-public class TestRemoveWatches extends CuratorTestBase
+@Test(groups = CuratorTestBase.zk35Group)
+public class TestWatchesBuilder extends CuratorTestBase
 {
     private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client)
     {
@@ -610,8 +612,58 @@ public class TestRemoveWatches extends CuratorTestBase
         {
             CloseableUtils.closeQuietly(client);
         }
-    }    
-    
+    }
+
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
     private static class CountDownWatcher implements Watcher {
         private String path;
         private EventType eventType;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
index 7e8ffbb..fbd6e2a 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -33,6 +34,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestWithCluster
 {
     @Test
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
index c929b41..63ccefe 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
@@ -30,6 +31,7 @@ import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestConnectionStateManager extends BaseClassForTests {
 
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index 175ccdf..246704f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -96,7 +96,7 @@ public class BaseTestTreeCache extends BaseClassForTests
     }
 
     @Override
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception
     {
         super.setup();
@@ -111,7 +111,7 @@ public class BaseTestTreeCache extends BaseClassForTests
     }
 
     @Override
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception
     {
         try
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index ff416d5..872eee2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -40,6 +41,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestNodeCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 78fabd5..ac02090 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
@@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.testng.AssertJUnit.assertNotNull;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestPathChildrenCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
index cd87125..3895625 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.Queues;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -34,6 +35,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestPathChildrenCacheInCluster extends BaseClassForTests
 {
     @Test(enabled = false)  // this test is very flakey - it needs to be re-written at some point
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 1e97ce2..4c7052d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
@@ -31,6 +32,7 @@ import org.testng.annotations.Test;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestTreeCache extends BaseTestTreeCache
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 4cb342c..07e9a17 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -19,11 +19,11 @@
 
 package org.apache.curator.framework.recipes.leader;
 
+import org.apache.curator.test.Compatibility;
 import org.apache.curator.test.TestingZooKeeperMain;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.server.ByteBufferInputStream;
-import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -92,7 +92,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                 log.debug("Rejected : " + si.toString());
                 // Still reject request
                 log.debug("Still not ready for " + remaining + "ms");
-                ((NIOServerCnxn)si.cnxn).close();
+                Compatibility.serverCnxnClose(si.cnxn);
                 return;
             }
             // Submit the request to the legacy Zookeeper server
@@ -113,13 +113,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
                         firstError = System.currentTimeMillis();
                         // The znode has been created, close the connection and don't tell it to client
                         log.warn("Closing connection right after " + createRequest.getPath() + " creation");
-                        ((NIOServerCnxn)si.cnxn).close();
+                        Compatibility.serverCnxnClose(si.cnxn);
                     }
                 }
                 catch ( Exception e )
                 {
                     // Should not happen
-                    ((NIOServerCnxn)si.cnxn).close();
+                    Compatibility.serverCnxnClose(si.cnxn);
                 }
             }
         }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 3d9e9b7..8f4e23f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -37,6 +37,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
@@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestLeaderLatch extends BaseClassForTests
 {
     private static final String PATH_NAME = "/one/two/me";
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index ed56f15..add547e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -30,6 +30,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestInterProcessSemaphoreCluster extends BaseClassForTests
 {
     @Test
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
index 360c876..049e1cd 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
-import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.curator.utils.ZKPaths;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -38,7 +37,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
 
-@Test(groups = Zk35MethodInterceptor.zk35Group)
+@Test(groups = CuratorTestBase.zk35Group)
 public class TestPersistentTtlNode extends CuratorTestBase
 {
     private final Timing timing = new Timing();
diff --git a/curator-test-zk34/pom.xml b/curator-test-zk34/pom.xml
index 3d06cb0..606aba2 100644
--- a/curator-test-zk34/pom.xml
+++ b/curator-test-zk34/pom.xml
@@ -160,7 +160,7 @@
                         <dependency>org.apache.curator:curator-framework</dependency>
                         <dependency>org.apache.curator:curator-recipes</dependency>
                     </dependenciesToScan>
-                    <excludedGroups>zk35</excludedGroups>
+                    <excludedGroups>zk35,zk36</excludedGroups>
                 </configuration>
             </plugin>
 
diff --git a/curator-test-zk34/pom.xml b/curator-test-zk35/pom.xml
similarity index 72%
copy from curator-test-zk34/pom.xml
copy to curator-test-zk35/pom.xml
index 3d06cb0..5803893 100644
--- a/curator-test-zk34/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -1,27 +1,56 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>apache-curator</artifactId>
         <groupId>org.apache.curator</groupId>
+        <artifactId>apache-curator</artifactId>
         <version>4.2.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>curator-test-zk34</artifactId>
-
-    <name>Curator ZooKeeper 3.4 Testing</name>
-    <description>Tests for ZoKeeper 3.4 compatibility</description>
-    <inceptionYear>2017</inceptionYear>
+    <artifactId>curator-test-zk35</artifactId>
 
     <properties>
-        <zookeeper-34-version>3.4.8</zookeeper-34-version>
+        <zookeeper-35-version>3.5.6</zookeeper-35-version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
-            <version>${zookeeper-34-version}</version>
+            <version>${zookeeper-35-version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.sun.jmx</groupId>
@@ -49,7 +78,6 @@
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
-            <version>2.12.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
@@ -61,31 +89,6 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <exclusions>
                 <exclusion>
@@ -93,12 +96,13 @@
                     <artifactId>zookeeper</artifactId>
                 </exclusion>
             </exclusions>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
+            <artifactId>curator-framework</artifactId>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
@@ -120,18 +124,6 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-math</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -160,32 +152,10 @@
                         <dependency>org.apache.curator:curator-framework</dependency>
                         <dependency>org.apache.curator:curator-recipes</dependency>
                     </dependenciesToScan>
-                    <excludedGroups>zk35</excludedGroups>
+                    <groups>zk35,zk35Compatibility</groups>
+                    <excludedGroups>zk36</excludedGroups>
                 </configuration>
             </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-curator-test-classes</id>
-                        <phase>validate</phase>
-                        <goals>
-                            <goal>copy-resources</goal>
-                        </goals>
-                        <configuration>
-                            <outputDirectory>${basedir}/target/generated-test-sources/test-annotations/org/apache/curator/test/compatibility</outputDirectory>
-                            <resources>
-                                <resource>
-                                    <directory>../curator-test/src/main/java/org/apache/curator/test/compatibility</directory>
-                                    <filtering>false</filtering>
-                                </resource>
-                            </resources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 </project>
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
new file mode 100644
index 0000000..5112d41
--- /dev/null
+++ b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework;
+
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.testng.annotations.Test;
+
+public class TestCompatibility extends CuratorTestBase
+{
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailable() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.watchers().add().forPath("/foo");
+        }
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailableAsync()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().forPath("/foo");
+        }
+    }
+}
diff --git a/curator-test-zk35/src/test/resources/log4j.properties b/curator-test-zk35/src/test/resources/log4j.properties
new file mode 100644
index 0000000..2a85e0d
--- /dev/null
+++ b/curator-test-zk35/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=ERROR, console
+
+log4j.logger.org.apache.curator=DEBUG, console
+log4j.additivity.org.apache.curator=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 3683b7d..4f0c5a2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -41,6 +41,16 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 51af821..79c8b9a 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -113,7 +113,7 @@ public class BaseClassForTests
         context.getSuite().addListener(methodListener2);
     }
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception
     {
         if ( INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES != null )
@@ -142,7 +142,7 @@ public class BaseClassForTests
         }
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception
     {
         System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY);
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
index 5b4b53f..1f8d181 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
@@ -18,10 +18,103 @@
  */
 package org.apache.curator.test;
 
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import java.lang.reflect.Method;
+
+@SuppressWarnings("unchecked")
 public class Compatibility
 {
+    private static final Method closeAllWithReasonMethod;
+    private static final Method closeAllMethod;
+    private static final Method closeWithReasonMethod;
+    private static final Method closeMethod;
+    private static final Object disconnectReasonObj;
+
+    static
+    {
+        Object localDisconnectReasonObj;
+        Method localCloseAllWithReasonMethod;
+        Method localCloseAllMethod;
+        Method localCloseWithReasonMethod;
+        Method localCloseMethod;
+        try
+        {
+            Class disconnectReasonClass = Class.forName("org.apache.zookeeper.server.ServerCnxn$DisconnectReason");
+            localDisconnectReasonObj = Enum.valueOf(disconnectReasonClass, "UNKNOWN");
+            localCloseAllWithReasonMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll", disconnectReasonClass);
+            localCloseWithReasonMethod = ServerCnxn.class.getDeclaredMethod("close", disconnectReasonClass);
+            localCloseAllMethod = null;
+            localCloseMethod = null;
+
+            localCloseAllWithReasonMethod.setAccessible(true);
+            localCloseWithReasonMethod.setAccessible(true);
+        }
+        catch ( Throwable e )
+        {
+            localDisconnectReasonObj = null;
+            localCloseAllWithReasonMethod = null;
+            localCloseWithReasonMethod = null;
+            try
+            {
+                localCloseAllMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll");
+                localCloseMethod = ServerCnxn.class.getDeclaredMethod("close");
+
+                localCloseAllMethod.setAccessible(true);
+                localCloseMethod.setAccessible(true);
+            }
+            catch ( Throwable ex )
+            {
+                throw new IllegalStateException("Could not reflectively find ServerCnxnFactory/ServerCnxn close methods");
+            }
+        }
+        disconnectReasonObj = localDisconnectReasonObj;
+        closeAllWithReasonMethod = localCloseAllWithReasonMethod;
+        closeAllMethod = localCloseAllMethod;
+        closeMethod = localCloseMethod;
+        closeWithReasonMethod = localCloseWithReasonMethod;
+    }
+
     public static boolean isZK34()
     {
         return false;
     }
+
+    public static void serverCnxnFactoryCloseAll(ServerCnxnFactory factory)
+    {
+        try
+        {
+            if ( closeAllMethod != null )
+            {
+                closeAllMethod.invoke(factory);
+            }
+            else
+            {
+                closeAllWithReasonMethod.invoke(factory, disconnectReasonObj);
+            }
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException("Could not close factory", e);
+        }
+    }
+
+    public static void serverCnxnClose(ServerCnxn cnxn)
+    {
+        try
+        {
+            if ( closeMethod != null )
+            {
+                closeMethod.invoke(cnxn);
+            }
+            else
+            {
+                closeWithReasonMethod.invoke(cnxn, disconnectReasonObj);
+            }
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException("Could not close connection", e);
+        }
+    }
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
      */
     public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception
     {
-        Method              m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+        Method              m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
         m.setAccessible(true);
         InetSocketAddress   address = (InetSocketAddress)m.invoke(client);
         if ( address != null )
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
index 3b3ab26..7489527 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
@@ -39,7 +39,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
                 Field               cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory");
                 cnxnFactoryField.setAccessible(true);
                 ServerCnxnFactory   cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer);
-                cnxnFactory.closeAll();
+                Compatibility.serverCnxnFactoryCloseAll(cnxnFactory);
 
                 Field               ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index 841df77..91f185f 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@ -81,7 +81,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
         {
             if ( cnxnFactory != null )
             {
-                cnxnFactory.closeAll();
+                Compatibility.serverCnxnFactoryCloseAll(cnxnFactory);
 
                 Field ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
@@ -262,7 +262,9 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
     {
         public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
         {
-            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null);
+            this.setTxnLogFactory(txnLog);
+            this.setMinSessionTimeout(config.getMinSessionTimeout());
+            this.setMaxSessionTimeout(config.getMaxSessionTimeout());
         }
 
         private final AtomicBoolean isRunning = new AtomicBoolean(false);
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
index a3c2a29..c5d4018 100644
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
+++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java
@@ -21,8 +21,11 @@ package org.apache.curator.test.compatibility;
 import org.apache.curator.test.BaseClassForTests;
 import org.testng.annotations.Listeners;
 
-@Listeners(Zk35MethodInterceptor.class)
 public class CuratorTestBase extends BaseClassForTests
 {
+    public static final String zk35Group = "zk35";
+    public static final String zk36Group = "zk36";
+    public static final String zk35CompatibilityGroup = "zk35Compatibility";
+
     protected final Timing2 timing = new Timing2();
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
deleted file mode 100644
index 8072b68..0000000
--- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.test.compatibility;
-
-import org.apache.curator.test.Compatibility;
-import org.testng.IMethodInstance;
-import org.testng.IMethodInterceptor;
-import org.testng.ITestContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class Zk35MethodInterceptor implements IMethodInterceptor
-{
-    public static final String zk35Group = "zk35";
-
-    @Override
-    public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context)
-    {
-        if ( !Compatibility.isZK34() )
-        {
-            return methods;
-        }
-
-        List<IMethodInstance> filteredMethods = new ArrayList<>();
-        for ( IMethodInstance method : methods )
-        {
-            if ( !isInGroup(method.getMethod().getGroups()) )
-            {
-                filteredMethods.add(method);
-            }
-        }
-        return filteredMethods;
-    }
-
-    private boolean isInGroup(String[] groups)
-    {
-        return (groups != null) && Arrays.asList(groups).contains(zk35Group);
-    }
-}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..e54b148 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -112,4 +112,11 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
      * @return builder object
      */
     AsyncRemoveWatchesBuilder removeWatches();
+
+    /**
+     * Start an add watch builder
+     *
+     * @return builder object
+     */
+    AsyncWatchBuilder addWatch();
 }
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
similarity index 60%
copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
index 42279d0..a5e86ec 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
@@ -16,16 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+package org.apache.curator.x.async.api;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.AddWatchMode;
+
+public interface AsyncWatchBuilder extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-}
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode
+     * @return this
+     */
+    AsyncWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
similarity index 74%
copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java
copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
index 5b4b53f..a97cd4b 100644
--- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.test;
 
-public class Compatibility
+package org.apache.curator.x.async.api;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+
+public interface AsyncWatchBuilder2 extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
 {
-    public static boolean isZK34()
-    {
-        return false;
-    }
-}
+}
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..a248c0e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.details;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -26,7 +27,10 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorMultiTransactionImpl;
 import org.apache.curator.framework.imps.GetACLBuilderImpl;
 import org.apache.curator.framework.imps.SyncBuilderImpl;
-import org.apache.curator.x.async.*;
+import org.apache.curator.utils.Compatibility;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.*;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
@@ -150,6 +154,13 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncWatchBuilder addWatch()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used.");
+        return new AsyncWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public CuratorFramework unwrap()
     {
         return client;
@@ -221,6 +232,16 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
         return new AsyncGetConfigBuilderImpl(client, filters, getBuilderWatchMode());
     }
 
+    Filters getFilters()
+    {
+        return filters;
+    }
+
+    CuratorFrameworkImpl getClient()
+    {
+        return client;
+    }
+
     private WatchMode getBuilderWatchMode()
     {
         return watched ? watchMode : null;
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
new file mode 100644
index 0000000..a10540e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.AddWatchBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.Watching;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncWatchBuilder;
+import org.apache.curator.x.async.api.AsyncWatchBuilder2;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AsyncWatchBuilder2, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+{
+    private final CuratorFrameworkImpl client;
+    private final Filters filters;
+    private Watching watching;
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+    {
+        this.client = client;
+        this.filters = filters;
+        watching = new Watching(client, true);
+    }
+
+    @Override
+    public AsyncWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncStage<Void> forPath(String path)
+    {
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+        AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode);
+        return safeCall(common.internalCallback, () -> builder.forPath(path));
+    }
+}
\ No newline at end of file
diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
new file mode 100644
index 0000000..bdf006a
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+public class TestAddWatch extends CuratorTestBase
+{
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test").toCompletableFuture().get();
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index 3a5e152..e223de6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
         <jdk-version>1.${short-jdk-version}</jdk-version>
 
         <!-- versions -->
-        <zookeeper-version>3.5.5</zookeeper-version>
+        <zookeeper-version>3.6.0-SNAPSHOT</zookeeper-version>
         <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
         <doxia-module-confluence-version>1.8</doxia-module-confluence-version>
@@ -85,25 +85,27 @@
         <guava-failureaccess-version>1.0.1</guava-failureaccess-version>
         <testng-version>6.14.3</testng-version>
         <swift-version>0.23.1</swift-version>
-        <dropwizard-version>1.3.7</dropwizard-version>
         <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version>
         <slf4j-version>1.7.25</slf4j-version>
         <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version>
+        <dropwizard-version>3.2.5</dropwizard-version>
+        <snappy-version>1.1.7</snappy-version>
 
         <!-- OSGi Properties -->
-        <osgi.export.package />
-        <osgi.import.package />
-        <osgi.private.package />
-        <osgi.dynamic.import />
-        <osgi.require.bundle />
-        <osgi.export.service />
-        <osgi.activator />
+        <osgi.export.package/>
+        <osgi.import.package/>
+        <osgi.private.package/>
+        <osgi.dynamic.import/>
+        <osgi.require.bundle/>
+        <osgi.export.service/>
+        <osgi.activator/>
     </properties>
 
     <scm>
         <url>https://github.com/apache/curator.git</url>
         <connection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</connection>
-        <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</developerConnection>
+        <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git
+        </developerConnection>
         <tag>apache-curator-3.2.0</tag>
     </scm>
 
@@ -318,6 +320,7 @@
         <module>curator-x-discovery-server</module>
         <module>curator-x-async</module>
         <module>curator-test-zk34</module>
+        <module>curator-test-zk35</module>
     </modules>
 
     <dependencyManagement>
@@ -567,6 +570,24 @@
                 <artifactId>dropwizard-logging</artifactId>
                 <version>${dropwizard-version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard-version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>${snappy-version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -582,7 +603,6 @@
                 <artifactId>maven-javadoc-plugin</artifactId>
                 <version>${maven-javadoc-plugin-version}</version>
                 <configuration>
-                    <aggregate>true</aggregate>
                     <additionalJOptions>
                         <additionalJOption>-J-Xmx1g</additionalJOption>
                     </additionalJOptions>
@@ -872,7 +892,8 @@
                             <relocations>
                                 <relocation>
                                     <pattern>com.google</pattern>
-                                    <shadedPattern>org.apache.curator.shaded.com.google</shadedPattern>
+                                    <shadedPattern>org.apache.curator.shaded.com.google
+                                    </shadedPattern>
                                     <excludes>
                                         <exclude>com.google.common.base.Function</exclude>
                                         <exclude>com.google.common.base.Predicate</exclude>
diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility.confluence
index ed6e32e..240734a 100644
--- a/src/site/confluence/zk-compatibility.confluence
+++ b/src/site/confluence/zk-compatibility.confluence
@@ -1,18 +1,19 @@
 h1. ZooKeeper Version Compatibility
 
-While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is
-used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator
-4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0
-both versions of ZooKeeper are supported via the same Curator libraries.
+There are multiple active version lines of ZooKeeper used in production by users. Curator can act
+as a client for any of these versions.
 
-h2. ZooKeeper 3.5.x
+h2. ZooKeeper 3.6.x
 
-* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x
-* If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0
+ZooKeeper 3.6.x is the newest version of ZooKeeper which adds new features such as
+Persistent/Recursive watchers.
 
-h2. ZooKeeper 3.4.x
+* Curator 4.3+ has a hard dependency on ZooKeeper 3.6.x
+* If you are using ZooKeeper 3.6.x there's nothing additional to do
 
-Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
+h2. ZooKeeper 3.5.x and ZooKeeper 3.4.x
+
+Curator 4.0 supports earlier versions of ZooKeeper in a soft\-compatibility mode. To use this mode
 you must exclude ZooKeeper when adding Curator to your dependency management tool.
 
 _Maven_
@@ -39,11 +40,11 @@ compile('org.apache.curator:curator-recipes:$curatorVersion') {
 }
 {code}
 
-You must add a dependency on ZooKeeper 3.4.x also.
+You must also add a dependency to the version of ZooKeeper you want to use.
 
 Curator will detect which ZooKeeper library is in use and automatically set ZooKeeper 3.4 compatibility
-mode as needed. In this mode, all features not supported by 3.4 are disabled. It is up to your
-application code to "do the right thing" and not use these features. Use the {{isZk34CompatibilityMode()}}
+or ZooKeeper 3.5 compatibility mode as needed. In this mode, all features not supported are disabled/disallowed. It is up to your
+application code to "do the right thing" and not use these features. Use the {{isZk34CompatibilityMode()}} or {{isZk35CompatibilityMode()}}
 method to determine which mode Curator is using at runtime.
 
 h2. Testing With ZooKeeper 3.4.x


[curator] 02/02: Add recipes that use persistent watchers including new cache recipes to replace all others.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch zk36-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 886f9ffc36edf78a51e530215db5064343bc98bb
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 12:45:21 2019 -0500

    Add recipes that use persistent watchers including new cache recipes to replace all others.
---
 .../framework/recipes/cache/CuratorCache.java      | 114 +++++++
 .../recipes/cache/CuratorCacheBuilder.java         |  63 ++++
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  74 ++++
 .../framework/recipes/cache/CuratorCacheImpl.java  | 303 +++++++++++++++++
 .../recipes/cache/CuratorCacheListener.java        |  78 +++++
 .../recipes/cache/CuratorCacheListenerBuilder.java | 129 +++++++
 .../cache/CuratorCacheListenerBuilderImpl.java     | 161 +++++++++
 .../recipes/cache/CuratorCacheStorage.java         | 114 +++++++
 .../recipes/cache/NodeCacheListenerWrapper.java    |  46 +++
 .../cache/PathChildrenCacheListenerWrapper.java    |  78 +++++
 .../recipes/cache/StandardCuratorCacheStorage.java |  88 +++++
 .../recipes/cache/TreeCacheListenerWrapper.java    |  81 +++++
 .../framework/recipes/watch/PersistentWatcher.java | 169 ++++++++++
 .../framework/recipes/cache/TestCuratorCache.java  | 248 ++++++++++++++
 .../recipes/cache/TestCuratorCacheConsistency.java | 373 +++++++++++++++++++++
 .../cache/TestCuratorCacheEventOrdering.java       |  52 +++
 .../recipes/cache/TestCuratorCacheWrappers.java    | 162 +++++++++
 .../recipes/cache/TestWrappedNodeCache.java        | 172 ++++++++++
 .../recipes/watch/TestPersistentWatcher.java       | 105 ++++++
 19 files changed, 2610 insertions(+)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..ebe06e0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+
+/**
+ * <p>
+ *     A utility that attempts to keep the data from a node locally cached. Optionally the entire
+ *     tree of children below the node can also be cached. Will respond to update/create/delete events, pull
+ *     down the data, etc. You can register listeners that will get notified when changes occur.
+ * </p>
+ *
+ * <p>
+ *     <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+ *     be prepared for false-positives and false-negatives. Additionally, always use the version number
+ *     when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * cache build options
+     */
+    enum Options
+    {
+        /**
+         * Normally the entire tree of nodes starting at the given node are cached. This option
+         * causes only the given node to be cached (i.e. a single node cache)
+         */
+        SINGLE_NODE_CACHE,
+
+        /**
+         * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+         */
+        COMPRESSED_DATA,
+
+        /**
+         * Normally, when the cache is closed via {@link CuratorCache#close()}, the storage is cleared
+         * via {@link CuratorCacheStorage#clear()}. This option prevents the storage from being cleared.
+         */
+        DO_NOT_CLEAR_ON_CLOSE
+    }
+
+    /**
+     * Return a Curator Cache for the given path with the given options using a standard storage instance
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, String path, Options... options)
+    {
+        return builder(client, path).withOptions(options).build();
+    }
+
+    /**
+     * Start a Curator Cache builder
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @return builder
+     */
+    static CuratorCacheBuilder builder(CuratorFramework client, String path)
+    {
+        return new CuratorCacheBuilderImpl(client, path);
+    }
+
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the storage instance being used
+     *
+     * @return storage
+     */
+    CuratorCacheStorage storage();
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
new file mode 100644
index 0000000..35a5f26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+public interface CuratorCacheBuilder
+{
+    /**
+     * @param options any options
+     * @return this
+     */
+    CuratorCacheBuilder withOptions(CuratorCache.Options... options);
+
+    /**
+     * Alternate storage to use. If not specified, {@link StandardCuratorCacheStorage#standard()} is used
+     *
+     * @param storage storage instance to use
+     * @return this
+     */
+    CuratorCacheBuilder withStorage(CuratorCacheStorage storage);
+
+    /**
+     * By default any unexpected exception is handled by logging the exception. You can change
+     * so that a handler is called instead. Under normal circumstances, this shouldn't be necessary.
+     *
+     * @param exceptionHandler exception handler to use
+     */
+    CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler);
+
+    /**
+     * Normally, listeners are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)}. Use this
+     * method to set a different executor.
+     *
+     * @param executor to use
+     */
+    CuratorCacheBuilder withExecutor(Executor executor);
+
+    /**
+     * Return a new Curator Cache based on the builder methods that have been called
+     *
+     * @return new Curator Cache
+     */
+    CuratorCache build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
new file mode 100644
index 0000000..9f9e03d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+class CuratorCacheBuilderImpl implements CuratorCacheBuilder
+{
+    private final CuratorFramework client;
+    private final String path;
+    private CuratorCacheStorage storage;
+    private Consumer<Exception> exceptionHandler;
+    private Executor executor;
+    private CuratorCache.Options[] options;
+
+    CuratorCacheBuilderImpl(CuratorFramework client, String path)
+    {
+        this.client = client;
+        this.path = path;
+    }
+
+    @Override
+    public CuratorCacheBuilder withOptions(CuratorCache.Options... options)
+    {
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withStorage(CuratorCacheStorage storage)
+    {
+        this.storage = storage;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler)
+    {
+        this.exceptionHandler = exceptionHandler;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withExecutor(Executor executor)
+    {
+        this.executor = executor;
+        return this;
+    }
+
+    @Override
+    public CuratorCache build()
+    {
+        return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..6349530
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final PersistentWatcher persistentWatcher;
+    private final CuratorFramework client;
+    private final CuratorCacheStorage storage;
+    private final String path;
+    private final boolean recursive;
+    private final boolean compressedData;
+    private final boolean clearOnClose;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+    private final Consumer<Exception> exceptionHandler;
+    private final Executor executor;
+
+    private volatile AtomicLong outstandingOps;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler)
+    {
+        Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.client = client;
+        this.storage = (storage != null) ? storage : CuratorCacheStorage.standard();
+        this.path = path;
+        recursive = !options.contains(Options.SINGLE_NODE_CACHE);
+        compressedData = options.contains(Options.COMPRESSED_DATA);
+        clearOnClose = !options.contains(Options.DO_NOT_CLEAR_ON_CLOSE);
+        persistentWatcher = new PersistentWatcher(client, path, recursive);
+        persistentWatcher.getListenable().addListener(this::processEvent);
+        persistentWatcher.getResetListenable().addListener(this::rebuild);
+        this.exceptionHandler = (exceptionHandler != null) ? exceptionHandler : e -> log.error("CuratorCache error", e);
+        this.executor = (executor != null) ? executor : client::runSafe;
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        outstandingOps = new AtomicLong(0);
+        persistentWatcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            persistentWatcher.close();
+            if ( clearOnClose )
+            {
+                storage.clear();
+            }
+        }
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return storage;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    private void rebuild()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        nodeChanged(path);
+        storage.stream()
+            .map(ChildData::getPath)
+            .filter(p -> !p.equals(path))
+            .forEach(this::nodeChanged);
+    }
+
+    private void processEvent(WatchedEvent event)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        switch ( event.getType() )
+        {
+        case NodeDataChanged:
+        case NodeCreated:
+        {
+            nodeChanged(event.getPath());
+            break;
+        }
+
+        case NodeDeleted:
+        {
+            removeStorage(event.getPath());
+            break;
+        }
+
+        case NodeChildrenChanged:
+        {
+            nodeChildrenChanged(event.getPath());
+            break;
+        }
+        }
+    }
+
+    private void nodeChildrenChanged(String fromPath)
+    {
+        if ( (state.get() != State.STARTED) || !recursive )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child)));
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+                checkDecrementOutstandingOps();
+            };
+
+            checkIncrementOutstandingOps();
+            client.getChildren().inBackground(callback).forPath(fromPath);
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void nodeChanged(String fromPath)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    nodeChildrenChanged(event.getPath());
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+                checkDecrementOutstandingOps();
+            };
+
+            checkIncrementOutstandingOps();
+            if ( compressedData )
+            {
+                client.getData().decompressed().inBackground(callback).forPath(fromPath);
+            }
+            else
+            {
+                client.getData().inBackground(callback).forPath(fromPath);
+            }
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    public static volatile boolean JZTEST = false;
+
+    private void putStorage(ChildData data)
+    {
+        Optional<ChildData> previousData = storage.put(data);
+        if ( previousData.isPresent() )
+        {
+            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            {
+                callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
+            }
+        }
+        else
+        {
+            callListeners(l -> l.event(NODE_CREATED, null, data));
+        }
+    }
+
+    private void removeStorage(String path)
+    {
+        storage.remove(path).ifPresent(previousData -> callListeners(l -> l.event(NODE_DELETED, previousData, null)));
+    }
+
+    private void callListeners(Consumer<CuratorCacheListener> proc)
+    {
+        if ( state.get() == State.STARTED )
+        {
+            executor.execute(() -> listenerManager.forEach(proc));
+        }
+    }
+
+    private void handleException(CuratorEvent event)
+    {
+        handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
+    }
+
+    private void handleException(Exception e)
+    {
+        ThreadUtils.checkInterrupted(e);
+        exceptionHandler.accept(e);
+    }
+
+    private void checkIncrementOutstandingOps()
+    {
+        AtomicLong localOutstandingOps = outstandingOps;
+        if ( localOutstandingOps != null )
+        {
+            localOutstandingOps.incrementAndGet();
+        }
+    }
+
+    private void checkDecrementOutstandingOps()
+    {
+        AtomicLong localOutstandingOps = outstandingOps;
+        if ( localOutstandingOps != null )
+        {
+            if ( localOutstandingOps.decrementAndGet() == 0 )
+            {
+                outstandingOps = null;
+                callListeners(CuratorCacheListener::initialized);
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..620e471
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+/**
+ * Listener for {@link CuratorCache} events. The main functional interface is general purpose
+ * but you can build event specific listeners, etc. using the builder. Note: all listeners
+ * are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)} when called.
+ */
+@FunctionalInterface
+public interface CuratorCacheListener
+{
+    /**
+     * An enumerated type that describes a change
+     */
+    enum Type
+    {
+        /**
+         * A new node was added to the cache
+         */
+        NODE_CREATED,
+
+        /**
+         * A node already in the cache has changed
+         */
+        NODE_CHANGED,
+
+        /**
+         * A node already in the cache was deleted
+         */
+        NODE_DELETED
+    }
+
+    /**
+     * Called when a data is created, changed or deleted.
+     *
+     * @param type the type of event
+     * @param oldData the old data or null
+     * @param data the new data or null
+     */
+    void event(Type type, ChildData oldData, ChildData data);
+
+    /**
+     * When the cache is started, the initial nodes are tracked and when they are finished loading
+     * into the cache this method is called.
+     */
+    default void initialized()
+    {
+        // NOP
+    }
+
+    /**
+     * Returns a builder allowing type specific, and special purpose listeners.
+     *
+     * @return builder
+     */
+    static CuratorCacheListenerBuilder builder()
+    {
+        return new CuratorCacheListenerBuilderImpl();
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
new file mode 100644
index 0000000..c57e881
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
+import java.util.function.Consumer;
+
+public interface CuratorCacheListenerBuilder
+{
+    /**
+     * Add a standard listener
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forAll(CuratorCacheListener listener);
+
+    /**
+     * Add a listener only for {@link Type#NODE_CREATED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener);
+
+    @FunctionalInterface
+    interface ChangeListener
+    {
+        void event(ChildData oldNode, ChildData node);
+    }
+
+    /**
+     * Add a listener only for {@link Type#NODE_CHANGED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forChanges(ChangeListener listener);
+
+    /**
+     * Add a listener only both {@link Type#NODE_CREATED} and {@link Type#NODE_CHANGED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener);
+
+    /**
+     * Add a listener only for {@link Type#NODE_DELETED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener);
+
+    /**
+     * Add a listener only for {@link CuratorCacheListener#initialized()}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forInitialized(Runnable listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+     * with CuratorCache.
+     *
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener);
+
+    /**
+     * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called.
+     * i.e. changes that occur as the cache is initializing are not sent to the listener
+     */
+    CuratorCacheListenerBuilder afterInitialized();
+
+    /**
+     * Build and return a new listener based on the methods that have been previously called
+     *
+     * @return new listener
+     */
+    CuratorCacheListener build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
new file mode 100644
index 0000000..4873868
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
+{
+    private final List<CuratorCacheListener> listeners = new ArrayList<>();
+    private boolean afterInitializedOnly = false;
+
+    @Override
+    public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener)
+    {
+        listeners.add(listener);
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CREATED )
+            {
+                listener.accept(node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forChanges(ChangeListener listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CHANGED )
+            {
+                listener.event(oldNode, node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( (type == CuratorCacheListener.Type.NODE_CHANGED) || (type == CuratorCacheListener.Type.NODE_CREATED) )
+            {
+                listener.event(oldNode, node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_DELETED )
+            {
+                listener.accept(oldNode);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forInitialized(Runnable listener)
+    {
+        CuratorCacheListener localListener = new CuratorCacheListener()
+        {
+            @Override
+            public void event(Type type, ChildData oldData, ChildData data)
+            {
+                // NOP
+            }
+
+            @Override
+            public void initialized()
+            {
+                listener.run();
+            }
+        };
+        listeners.add(localListener);
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener)
+    {
+        listeners.add(new TreeCacheListenerWrapper(client, listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener)
+    {
+        listeners.add(new NodeCacheListenerWrapper(listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder afterInitialized()
+    {
+        afterInitializedOnly = true;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListener build()
+    {
+        List<CuratorCacheListener> copy = new ArrayList<>(listeners);
+        return new CuratorCacheListener()
+        {
+            private volatile boolean isInitialized = !afterInitializedOnly;
+
+            @Override
+            public void event(Type type, ChildData oldData, ChildData data)
+            {
+                if ( isInitialized )
+                {
+                    copy.forEach(l -> l.event(type, oldData, data));
+                }
+            }
+
+            @Override
+            public void initialized()
+            {
+                isInitialized = true;
+                copy.forEach(CuratorCacheListener::initialized);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..f3d870d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Interface for maintaining data in a {@link CuratorCache}
+ */
+public interface CuratorCacheStorage
+{
+    /**
+     * Return a new standard storage instance
+     *
+     * @return storage instance
+     */
+    static CuratorCacheStorage standard() {
+        return new StandardCuratorCacheStorage(true);
+    }
+
+    /**
+     * Return a new storage instance that does not retain the data bytes. i.e. ChildData objects
+     * returned by this storage will always return {@code null} for {@link ChildData#getData()}.
+     *
+     * @return storage instance that does not retain data bytes
+     */
+    static CuratorCacheStorage bytesNotCached() {
+        return new StandardCuratorCacheStorage(false);
+    }
+
+    /**
+     * Add an entry to storage and return any previous entry at that path
+     *
+     * @param data entry to add
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> put(ChildData data);
+
+    /**
+     * Remove the entry from storage and return any previous entry at that path
+     *
+     * @param path path to remove
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> remove(String path);
+
+    /**
+     * Return an entry from storage
+     *
+     * @param path path to get
+     * @return entry or {@code empty()}
+     */
+    Optional<ChildData> get(String path);
+
+    /**
+     * Return the current number of entries in storage
+     *
+     * @return number of entries
+     */
+    int size();
+
+    /**
+     * Return a stream over the storage entries. Note: for a standard storage instance, the stream
+     * behaves like a stream returned by {@link java.util.concurrent.ConcurrentHashMap#entrySet()}
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> stream();
+
+    /**
+     * Return a stream over the storage entries that are the immediate children of the given node.
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> streamImmediateChildren(String fromParent);
+
+    /**
+     * Utility - given a stream of child nodes, build a map. Note: it is assumed that each child
+     * data in the stream has a unique path
+     *
+     * @param stream stream of child nodes with unique paths
+     * @return map
+     */
+    static Map<String, ChildData> toMap(Stream<ChildData> stream)
+    {
+        return stream.map(data -> new AbstractMap.SimpleEntry<>(data.getPath(), data))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    /**
+     * Reset the storage to zero entries
+     */
+    void clear();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..468049b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+
+class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final NodeCacheListener listener;
+
+    NodeCacheListenerWrapper(NodeCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        try
+        {
+            listener.nodeChanged();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..a9123c1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+    private final PathChildrenCacheListener listener;
+    private final CuratorFramework client;
+
+    PathChildrenCacheListenerWrapper(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        this.listener = listener;
+        this.client = client;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        switch ( type )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void initialized()
+    {
+        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+    }
+
+    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    {
+        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..bbdb21d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.utils.ZKPaths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+    private final Map<String, ChildData> dataMap;
+    private final boolean cacheBytes;
+
+    StandardCuratorCacheStorage(boolean cacheBytes)
+    {
+        this.dataMap = new ConcurrentHashMap<>();
+        this.cacheBytes = cacheBytes;
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null);
+        return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        return Optional.ofNullable(dataMap.remove(path));
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(dataMap.get(path));
+    }
+
+    @Override
+    public int size()
+    {
+        return dataMap.size();
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        return dataMap.values().stream();
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return dataMap.entrySet()
+            .stream()
+            .filter(entry -> {
+                ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(entry.getKey());
+                return pathAndNode.getPath().equals(fromParent);
+            })
+            .map(Map.Entry::getValue);
+    }
+
+    @Override
+    public void clear()
+    {
+        dataMap.clear();
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..570799b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final CuratorFramework client;
+    private final TreeCacheListener listener;
+
+    TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener)
+    {
+        this.client = client;
+        this.listener = listener;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        switch ( type )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(data, TreeCacheEvent.Type.NODE_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(data, TreeCacheEvent.Type.NODE_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void initialized()
+    {
+        sendEvent(null, TreeCacheEvent.Type.INITIALIZED);
+    }
+
+    private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+    {
+        TreeCacheEvent event = new TreeCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..87ecb6e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A managed persistent watcher. The watch will be managed such that it stays set through
+ * connection lapses, etc.
+ */
+public class PersistentWatcher implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+    private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard();
+    private final ConnectionStateListener connectionStateListener = (client, newState) -> {
+        if ( newState.isConnected() )
+        {
+            reset();
+        }
+    };
+    private final Watcher watcher = event -> listeners.forEach(w -> w.process(event));
+    private final CuratorFramework client;
+    private final String basePath;
+    private final boolean recursive;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    /**
+     * @param client client
+     * @param basePath path to set the watch on
+     * @param recursive ZooKeeper persistent watches can optionally be recursive
+     */
+    public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+        this.recursive = recursive;
+    }
+
+    /**
+     * Start watching
+     */
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    /**
+     * Remove the watcher
+     */
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            listeners.clear();
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+            try
+            {
+                client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.debug(String.format("Could not remove watcher for path: %s", basePath), e);
+            }
+        }
+    }
+
+    /**
+     * Container for setting listeners
+     *
+     * @return listener container
+     */
+    public Listenable<Watcher> getListenable()
+    {
+        return listeners;
+    }
+
+    /**
+     * Listeners are called when the persistent watcher has been successfully registered
+     * or re-registered after a connection disruption
+     *
+     * @return listener container
+     */
+    public StandardListenerManager<Runnable> getResetListenable()
+    {
+        return resetListeners;
+    }
+
+    private void reset()
+    {
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+                {
+                    reset();
+                }
+                else
+                {
+                    resetListeners.forEach(Runnable::run);
+                }
+            };
+            client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
+        }
+        catch ( Exception e )
+        {
+            log.error("Could not reset persistent watch at path: " + basePath, e);
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..8560f87
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCache extends CuratorTestBase
+{
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
+    {
+        CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            try (CuratorCache cache = CuratorCache.builder(client, "/test").withStorage(storage).build())
+            {
+                cache.listenable().addListener(builder().forChanges((__, ___) -> updatedLatch.countDown()).build());
+                cache.listenable().addListener(builder().forCreates(__ -> addedLatch.countDown()).build());
+                cache.start();
+
+                client.create().forPath("/test/foo", "first".getBytes());
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+                client.setData().forPath("/test/foo", "something new".getBytes());
+                Assert.assertTrue(timing.awaitLatch(updatedLatch));
+            }
+        }
+    }
+
+    @Test
+    public void testAfterInitialized() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            client.create().creatingParentsIfNeeded().forPath("/test/one");
+            client.create().creatingParentsIfNeeded().forPath("/test/one/two");
+            client.create().creatingParentsIfNeeded().forPath("/test/one/two/three");
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                CountDownLatch initializedLatch = new CountDownLatch(1);
+                AtomicInteger eventCount = new AtomicInteger(0);
+                CuratorCacheListener listener = new CuratorCacheListener()
+                {
+                    @Override
+                    public void event(Type type, ChildData oldData, ChildData data)
+                    {
+                        eventCount.incrementAndGet();
+                    }
+
+                    @Override
+                    public void initialized()
+                    {
+                        initializedLatch.countDown();
+                    }
+                };
+                cache.listenable().addListener(builder().forAll(listener).afterInitialized().build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(initializedLatch));
+
+                Assert.assertEquals(initializedLatch.getCount(), 0);
+                Assert.assertEquals(cache.storage().size(), 4);
+                Assert.assertTrue(cache.storage().get("/test").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one/two").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one/two/three").isPresent());
+            }
+        }
+    }
+
+    @Test
+    public void testListenerBuilder() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                Semaphore all = new Semaphore(0);
+                Semaphore deletes = new Semaphore(0);
+                Semaphore changes = new Semaphore(0);
+                Semaphore creates = new Semaphore(0);
+                Semaphore createsAndChanges = new Semaphore(0);
+
+                CuratorCacheListener listener = builder().forAll((__, ___, ____) -> all.release()).forDeletes(__ -> deletes.release()).forChanges((__, ___) -> changes.release()).forCreates(__ -> creates.release()).forCreatesAndChanges((__, ___) -> createsAndChanges.release()).build();
+                cache.listenable().addListener(listener);
+                cache.start();
+
+                client.create().forPath("/test");
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(creates, 1));
+                Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+                Assert.assertEquals(changes.availablePermits(), 0);
+                Assert.assertEquals(deletes.availablePermits(), 0);
+
+                client.setData().forPath("/test", "new".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(changes, 1));
+                Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+                Assert.assertEquals(creates.availablePermits(), 0);
+                Assert.assertEquals(deletes.availablePermits(), 0);
+
+                client.delete().forPath("/test");
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(deletes, 1));
+                Assert.assertEquals(creates.availablePermits(), 0);
+                Assert.assertEquals(changes.availablePermits(), 0);
+                Assert.assertEquals(createsAndChanges.availablePermits(), 0);
+            }
+        }
+    }
+
+    @Test
+    public void testOverrideExecutor() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            CountDownLatch latch = new CountDownLatch(2);
+            Executor executor = proc -> {
+                latch.countDown();
+                proc.run();
+            };
+            try ( CuratorCache cache = CuratorCache.builder(client, "/test").withExecutor(executor).build() )
+            {
+                cache.listenable().addListener((type, oldData, data) -> latch.countDown());
+                cache.start();
+
+                client.create().forPath("/test");
+
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+        }
+    }
+
+    @Test
+    public void testClearOnClose() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            CuratorCacheStorage storage;
+            client.start();
+
+            try ( CuratorCache cache = CuratorCache.builder(client, "/test").withOptions(DO_NOT_CLEAR_ON_CLOSE).build() )
+            {
+                cache.start();
+                storage = cache.storage();
+
+                client.create().forPath("/test", "foo".getBytes());
+                client.create().forPath("/test/bar", "bar".getBytes());
+                timing.sleepABit();
+            }
+            Assert.assertEquals(storage.size(), 2);
+
+            try ( CuratorCache cache = CuratorCache.build(client, "/test") )
+            {
+                cache.start();
+                storage = cache.storage();
+
+                timing.sleepABit();
+            }
+            Assert.assertEquals(storage.size(), 0);
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..ab9dbe4
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+/**
+ * Randomly create nodes in a tree while a set of CuratorCaches listens. Afterwards, validate
+ * that the caches contain the same values as ZK itself
+ */
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheConsistency extends CuratorTestBase
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    private static final Duration testLength = Duration.ofSeconds(30);
+    private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3);
+    private static final Duration sleepLength = Duration.ofMillis(5);
+    private static final int nodesPerLevel = 10;
+    private static final int clusterSize = 5;
+    private static final int maxServerKills = 2;
+
+    private static final String BASE_PATH = "/test";
+
+    private class Client implements Closeable
+    {
+        private final CuratorFramework client;
+        private final CuratorCache cache;
+        private final int index;
+        private final Map<String, ChildData> listenerDataMap = new HashMap<>();
+
+        Client(int index, String connectionString, AtomicReference<Exception> errorSignal)
+        {
+            this.index = index;
+            client = buildClient(connectionString);
+            cache = CuratorCache.builder(client, BASE_PATH).withOptions(DO_NOT_CLEAR_ON_CLOSE).withExceptionHandler(errorSignal::set).build();
+
+            // listenerDataMap is a local data map that will hold values sent by listeners
+            // this way, the listener code can be tested for validity and consistency
+            CuratorCacheListener listener = builder().forCreates(node -> {
+                ChildData previous = listenerDataMap.put(node.getPath(), node);
+                if ( previous != null )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Create for existing node: %s", index, node.getPath())));
+                }
+            }).forChanges((oldNode, node) -> {
+                ChildData previous = listenerDataMap.put(node.getPath(), node);
+                if ( (previous == null) || !Arrays.equals(previous.getData(), oldNode.getData()) )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Bad old value for change node: %s", index, node.getPath())));
+                }
+            }).forDeletes(node -> {
+                ChildData previous = listenerDataMap.remove(node.getPath());
+                if ( previous == null )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Delete for non-existent node: %s", index, node.getPath())));
+                }
+            }).build();
+            cache.listenable().addListener(listener);
+        }
+
+        void start()
+        {
+            client.start();
+            cache.start();
+        }
+
+        @Override
+        public void close()
+        {
+            cache.close();
+            client.close();
+        }
+    }
+
+    @Test
+    public void testConsistencyAfterSimulation() throws Exception
+    {
+        int clientQty = random.nextInt(10, 20);
+        int maxDepth = random.nextInt(5, 10);
+
+        log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+        List<Client> clients = Collections.emptyList();
+        Map<String, String> actualTree;
+
+        AtomicReference<Exception> errorSignal = new AtomicReference<>();
+        try (TestingCluster cluster = new TestingCluster(clusterSize))
+        {
+            cluster.start();
+
+            initializeBasePath(cluster);
+            try
+            {
+                clients = buildClients(cluster, clientQty, errorSignal);
+                workLoop(cluster, clients, maxDepth, errorSignal);
+
+                log.info("Test complete - sleeping to allow events to complete");
+                timing.sleepABit();
+            }
+            finally
+            {
+                clients.forEach(Client::close);
+            }
+
+            actualTree = buildActual(cluster);
+        }
+
+        log.info("client qty: {}", clientQty);
+
+        Map<Integer, List<String>> errorsList = clients.stream()
+            .map(client ->  findErrors(client, actualTree))
+            .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        if ( !errorsList.isEmpty() )
+        {
+            log.error("{} clients had errors", errorsList.size());
+            errorsList.forEach((index, errorList) -> {
+                log.error("Client {}", index);
+                errorList.forEach(log::error);
+                log.error("");
+            });
+
+            Assert.fail("Errors found");
+        }
+    }
+
+    // build a data map recursively from the actual values in ZK
+    private Map<String, String> buildActual(TestingCluster cluster)
+    {
+        Map<String, String> actual = new HashMap<>();
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            buildActual(client, actual, BASE_PATH);
+        }
+        return actual;
+    }
+
+    private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath)
+    {
+        try
+        {
+            byte[] bytes = client.getData().forPath(fromPath);
+            actual.put(fromPath, new String(bytes));
+            client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("", e);
+        }
+    }
+
+    private List<Client> buildClients(TestingCluster cluster, int clientQty, AtomicReference<Exception> errorSignal)
+    {
+        return IntStream.range(0, clientQty)
+            .mapToObj(index -> new Client(index, cluster.getConnectString(), errorSignal))
+            .peek(Client::start)
+            .collect(Collectors.toList());
+    }
+
+    private void initializeBasePath(TestingCluster cluster) throws Exception
+    {
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            client.create().forPath(BASE_PATH, "".getBytes());
+        }
+    }
+
+    private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth, AtomicReference<Exception> errorSignal) throws Exception
+    {
+        Instant start = Instant.now();
+        Instant lastServerKill = Instant.now();
+        int serverKillIndex = 0;
+        while ( true )
+        {
+            Duration elapsed = Duration.between(start, Instant.now());
+            if ( elapsed.compareTo(testLength) >= 0 )
+            {
+                break;
+            }
+
+            Exception errorSignalException = errorSignal.get();
+            if ( errorSignalException != null )
+            {
+                Assert.fail("A client's error handler was called", errorSignalException);
+            }
+
+            Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now());
+            if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+            {
+                lastServerKill = Instant.now();
+                if ( serverKillIndex < maxServerKills )
+                {
+                    doKillServer(cluster, serverKillIndex++);
+                }
+            }
+
+            int thisDepth = random.nextInt(0, maxDepth);
+            String thisPath = randomPath(thisDepth);
+            CuratorFramework client = randomClient(clients);
+            if ( random.nextBoolean() )
+            {
+                doDelete(client, thisPath);
+            }
+            else
+            {
+                doChange(client, thisPath);
+            }
+
+            Thread.sleep(sleepLength.toMillis());
+        }
+    }
+
+    private void doChange(CuratorFramework client, String thisPath)
+    {
+        try
+        {
+            String thisData = Long.toString(random.nextLong());
+            client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes());
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not create/set: " + thisPath);
+        }
+    }
+
+    private void doDelete(CuratorFramework client, String thisPath)
+    {
+        if ( thisPath.equals(BASE_PATH) )
+        {
+            return;
+        }
+        try
+        {
+            client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not delete: " + thisPath);
+        }
+    }
+
+    private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception
+    {
+        log.info("Killing server {}", serverKillIndex);
+        InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+        cluster.killServer(killSpec);
+    }
+
+    private CuratorFramework randomClient(List<Client> clients)
+    {
+        return clients.get(random.nextInt(clients.size())).client;
+    }
+
+    private Map.Entry<Integer, List<String>> findErrors(Client client, Map<String, String> tree)
+    {
+        CuratorCacheStorage storage = client.cache.storage();
+        List<String> errors = new ArrayList<>();
+        if ( tree.size() != storage.size() )
+        {
+            errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size()));
+        }
+        tree.keySet().forEach(path -> {
+            if ( !storage.get(path).isPresent() )
+            {
+                errors.add(String.format("Path %s in master but not client", path));
+            }
+        });
+        storage.stream().forEach(data -> {
+            String treeValue = tree.get(data.getPath());
+            if ( treeValue != null )
+            {
+                if ( !treeValue.equals(new String(data.getData())) )
+                {
+                    errors.add(String.format("Data at %s is not the same", data.getPath()));
+                }
+
+                ChildData listenersMapData = client.listenerDataMap.get(data.getPath());
+                if ( listenersMapData == null )
+                {
+                    errors.add(String.format("listenersMap missing data at: %s", data.getPath()));
+                }
+                else if ( !treeValue.equals(new String(listenersMapData.getData())) )
+                {
+                    errors.add(String.format("Data at %s in listenersMap is not the same", data.getPath()));
+                }
+            }
+            else
+            {
+                errors.add(String.format("Path %s in client but not master", data.getPath()));
+            }
+        });
+
+        client.listenerDataMap.keySet().forEach(path -> {
+            if ( !storage.get(path).isPresent() )
+            {
+                errors.add(String.format("Path %s in listenersMap but not storage", path));
+            }
+        });
+
+        return new AbstractMap.SimpleEntry<>(client.index, errors);
+    }
+
+    private String randomPath(int depth)
+    {
+        StringBuilder str = new StringBuilder(BASE_PATH);
+        while ( depth-- > 0 )
+        {
+            int levelNodeName = random.nextInt(nodesPerLevel);
+            str.append("/").append(levelNodeName);
+        }
+        return str.toString();
+    }
+
+    @Override
+    protected void createServer()
+    {
+        // do nothing - we'll be using TestingCluster instead
+    }
+
+    private CuratorFramework buildClient(String connectionString)
+    {
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100);
+        return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy);
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
new file mode 100644
index 0000000..8baf2e2
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEventOrdering extends TestEventOrdering<CuratorCache>
+{
+    @Override
+    protected int getActualQty(CuratorCache cache)
+    {
+        return cache.storage().size();
+    }
+
+    @Override
+    protected CuratorCache newCache(CuratorFramework client, String path, BlockingQueue<Event> events)
+    {
+        CuratorCache cache = CuratorCache.build(client, path);
+        cache.listenable().addListener((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CREATED )
+            {
+                events.add(new Event(EventType.ADDED, node.getPath()));
+            }
+            else if ( type == CuratorCacheListener.Type.NODE_DELETED )
+            {
+                events.add(new Event(EventType.DELETED, oldNode.getPath()));
+            }
+        });
+        cache.start();
+        return cache;
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
new file mode 100644
index 0000000..4a75acf
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.SINGLE_NODE_CACHE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheStorage.toMap;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheWrappers extends CuratorTestBase
+{
+    @Test
+    public void testPathChildrenCache() throws Exception    // copied from TestPathChildrenCache#testBasics()
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<>();
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                PathChildrenCacheListener listener = (__, event) -> {
+                    if ( event.getData().getPath().equals("/test/one") )
+                    {
+                        events.offer(event.getType());
+                    }
+                };
+                cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
+                cache.start();
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            }
+        }
+    }
+
+    @Test
+    public void testTreeCache() throws Exception    // copied from TestTreeCache#testBasics()
+    {
+        BaseTestTreeCache treeCacheBase = new BaseTestTreeCache();
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                cache.listenable().addListener(builder().forTreeCache(client, treeCacheBase.eventListener).build());
+                cache.start();
+
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.INITIALIZED);
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/t").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/testing").count(), 0);
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "hey there");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test/one")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/o").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/onely").count(), 0);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+            }
+        }
+    }
+
+    @Test
+    public void testNodeCache() throws Exception    // copied from TestNodeCache#testBasics()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test/node", SINGLE_NODE_CACHE))
+            {
+                Supplier<ChildData> getRootData = () -> cache.storage().get("/test/node").orElseThrow(() -> new AssertionError("is not present"));
+                cache.start();
+
+                final Semaphore semaphore = new Semaphore(0);
+                cache.listenable().addListener(builder().forNodeCache(semaphore::release).build());
+                try
+                {
+                    getRootData.get();
+                    Assert.fail("Should have thrown");
+                }
+                catch ( AssertionError expected )
+                {
+                    // expected
+                }
+
+                client.create().forPath("/test/node", "a".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(getRootData.get().getData(), "a".getBytes());
+
+                client.setData().forPath("/test/node", "b".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(getRootData.get().getData(), "b".getBytes());
+
+                client.delete().forPath("/test/node");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                try
+                {
+                    getRootData.get();
+                    Assert.fail("Should have thrown");
+                }
+                catch ( AssertionError expected )
+                {
+                    // expected
+                }
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..3e81b63
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestWrappedNodeCache extends CuratorTestBase
+{
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes());
+
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCache.build(client, "/test/foo");
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/foo");
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "one".getBytes());
+
+            client.delete().forPath("/test/foo");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            client.create().forPath("/test/foo", "two".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "two".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes());
+
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            });
+
+            cache = CuratorCache.build(client,"/test/node");
+
+            Semaphore latch = new Semaphore(0);
+            NodeCacheListener listener = latch::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "start".getBytes());
+
+            client.setData().forPath("/test/node", "new data".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "new data".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCache.build(client, "/test/node");
+            cache.start();
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+            final Semaphore semaphore = new Semaphore(0);
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Assert.assertNull(rootData.get().orElse(null));
+
+            client.create().forPath("/test/node", "a".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(rootData.get().orElse(null).getData(), "a".getBytes());
+
+            client.setData().forPath("/test/node", "b".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(rootData.get().orElse(null).getData(), "b".getBytes());
+
+            client.delete().forPath("/test/node");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertNull(rootData.get().orElse(null));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    private Supplier<Optional<ChildData>> getRootDataProc(CuratorCache cache, String rootPath)
+    {
+        return () -> cache.storage().get(rootPath);
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
new file mode 100644
index 0000000..534c365
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPersistentWatcher extends CuratorTestBase
+{
+    @Test
+    public void testConnectionLostRecursive() throws Exception
+    {
+        internalTest(true);
+    }
+
+    @Test
+    public void testConnectionLost() throws Exception
+    {
+        internalTest(false);
+    }
+
+    private void internalTest(boolean recursive) throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            client.start();
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+                else if ( newState == ConnectionState.RECONNECTED )
+                {
+                    reconnectedLatch.countDown();
+                }
+            });
+
+            try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) )
+            {
+                persistentWatcher.start();
+
+                BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+                persistentWatcher.getListenable().addListener(events::add);
+
+                client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+                if ( recursive )
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+                }
+                else
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");   // child added
+                }
+
+                server.stop();
+                Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected);
+                Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+                server.restart();
+                Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+
+                timing.sleepABit();     // time to allow watcher to get reset
+                events.clear();
+
+                if ( recursive )
+                {
+                    client.setData().forPath("/top/main/a", "foo".getBytes());
+                    Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+                }
+                client.setData().forPath("/top/main", "bar".getBytes());
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            }
+        }
+    }
+}
\ No newline at end of file