You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/07 12:49:10 UTC

[curator] 01/01: wip

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-cache
in repository https://gitbox.apache.org/repos/asf/curator.git

commit c2590fcbf581d2ec1c8e4a329c865ece821972e6
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 7 15:48:57 2019 +0300

    wip
---
 .../framework/recipes/cache/CuratorCache.java      | 132 +++++++++
 .../framework/recipes/cache/CuratorCacheImpl.java  | 249 +++++++++++++++++
 .../recipes/cache/CuratorCacheListener.java        | 217 +++++++++++++++
 .../recipes/cache/CuratorCacheStorage.java         |  56 ++++
 .../curator/framework/recipes/cache/NodeCache.java |   3 +
 .../recipes/cache/NodeCacheListenerWrapper.java    |  41 +++
 .../framework/recipes/cache/PathChildrenCache.java |   3 +
 .../cache/PathChildrenCacheListenerWrapper.java    |  46 ++++
 .../recipes/cache/StandardCuratorCacheStorage.java |  98 +++++++
 .../curator/framework/recipes/cache/TreeCache.java |   3 +
 .../recipes/cache/TreeCacheListenerWrapper.java    |  46 ++++
 .../framework/recipes/watch/PersistentWatcher.java |  39 ++-
 .../framework/recipes/cache/TestCuratorCache.java  |  93 +++++++
 .../recipes/cache/TestCuratorCacheConsistency.java | 301 +++++++++++++++++++++
 .../recipes/cache/TestWrappedNodeCache.java        | 147 ++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  20 ++
 .../org/apache/curator/test/TestingCluster.java    |   2 +-
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  18 ++
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 -----
 19 files changed, 1511 insertions(+), 78 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..ef8d290
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Optional;
+
+/**
+ * <p>
+ *     A utility that attempts to keep the data from a node locally cached. Optionally the entire
+ *     tree of children below the node can also be cached. Will respond to update/create/delete events, pull
+ *     down the data, etc. You can register listeners that will get notified when changes occur.
+ * </p>
+ *
+ * <p>
+ *     <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+ *     be prepared for false-positives and false-negatives. Additionally, always use the version number
+ *     when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * cache build options
+     */
+    enum Options
+    {
+        /**
+         * Cache the entire tree of nodes starting at the given node
+         */
+        RECURSIVE,
+
+        /**
+         * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+         */
+        COMPRESSED_DATA
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options using a standard storage instance
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, String path, Options... options)
+    {
+        return build(client, CuratorCacheStorage.standard(), path, options);
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options and the given storage instance
+     *
+     * @param client Curator client
+     * @param storage storage to use
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, CuratorCacheStorage storage, String path, Options... options)
+    {
+        return new CuratorCacheImpl(client, storage, path, options);
+    }
+
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc. Note: also calls {@link CuratorCacheStorage#close()}
+     */
+    @Override
+    void close();
+
+    /**
+     * Utility to force a rebuild of the cache. Normally, this should not ever be needed
+     */
+    void forceRebuild();
+
+    /**
+     * Return the storage instance being used
+     *
+     * @return storage
+     */
+    CuratorCacheStorage storage();
+
+    /**
+     * Return the root node being cached (i.e. the node passed to the builder)
+     *
+     * @return root node path
+     */
+    String getRootPath();
+
+    /**
+     * Convenience to return the root node data
+     *
+     * @return data (if it's in the cache)
+     */
+    default Optional<ChildData> getRootData()
+    {
+        return storage().get(getRootPath());
+    }
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..ef37ebe
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final PersistentWatcher persistentWatcher;
+    private final CuratorFramework client;
+    private final CuratorCacheStorage storage;
+    private final String path;
+    private final boolean recursive;
+    private final boolean compressedData;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options... optionsArg)
+    {
+        Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.client = client;
+        this.storage = storage;
+        this.path = path;
+        this.recursive = options.contains(Options.RECURSIVE);
+        this.compressedData = options.contains(Options.COMPRESSED_DATA);
+        persistentWatcher = new PersistentWatcher(client, path, recursive);
+        persistentWatcher.getListenable().addListener(this::processEvent);
+        persistentWatcher.getResetListenable().addListener(this::forceRebuild);
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        persistentWatcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            persistentWatcher.close();
+            storage.close();
+        }
+    }
+
+    @Override
+    public void forceRebuild()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        nodeChanged(path);
+        storage.stream()
+            .map(ChildData::getPath)
+            .filter(p -> !p.equals(path))
+            .forEach(this::nodeChanged);
+    }
+
+    @Override
+    public String getRootPath()
+    {
+        return path;
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return storage;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    protected void processEvent(WatchedEvent event)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        switch ( event.getType() )
+        {
+            case NodeDataChanged:
+            case NodeCreated:
+            {
+                nodeChanged(event.getPath());
+                break;
+            }
+
+            case NodeDeleted:
+            {
+                removeStorage(event.getPath());
+                break;
+            }
+
+            case NodeChildrenChanged:
+            {
+                nodeChildrenChanged(event.getPath());
+                break;
+            }
+        }
+    }
+
+    private void nodeChildrenChanged(String fromPath)
+    {
+        if ( (state.get() != State.STARTED) || !recursive )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child)));
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    System.err.println(event);  // TODO
+                }
+            };
+            client.getChildren().inBackground(callback).forPath(fromPath);
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();  // TODO
+        }
+    }
+
+    private void nodeChanged(String fromPath)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    nodeChildrenChanged(event.getPath());
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    System.err.println(event);  // TODO
+                }
+            };
+            if ( compressedData )
+            {
+                client.getData().decompressed().inBackground(callback).forPath(fromPath);
+            }
+            else
+            {
+                client.getData().inBackground(callback).forPath(fromPath);
+            }
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();  // TODO
+            // TODO
+        }
+    }
+
+    private void putStorage(ChildData data)
+    {
+        Optional<ChildData> previousData = storage.put(data);
+        if ( previousData.isPresent() )
+        {
+            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            {
+                callListeners(l -> l.nodeChanged(previousData.get(), data));
+            }
+        }
+        else
+        {
+            callListeners(l -> l.nodeCreated(data));
+        }
+    }
+
+    private void removeStorage(String path)
+    {
+        storage.remove(path).ifPresent(previousData -> callListeners(l -> l.nodeDeleted(previousData)));
+    }
+
+    private void callListeners(Consumer<CuratorCacheListener> proc)
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.runSafe(() -> listenerManager.forEach(proc));
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..0d516f1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,217 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Listener for {@link CuratorCache} events
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheListener
+{
+    /**
+     * Called when a node that was in the cache was created
+     *
+     * @param node the value of the node
+     */
+    default void nodeCreated(ChildData node)
+    {
+        // NOP
+    }
+
+    /**
+     * Called when a node that was in the cache has changed
+     *
+     * @param oldNode the previous value of the node
+     * @param node the new value of the node
+     */
+    default void nodeChanged(ChildData oldNode, ChildData node)
+    {
+        // NOP
+    }
+
+    /**
+     * Called when a node that was in the cache was deleted
+     *
+     * @param oldNode the pre-deleted value of the node
+     */
+    default void nodeDeleted(ChildData oldNode)
+    {
+        // NOP
+    }
+
+    @FunctionalInterface
+    interface ChangeListener {
+        /**
+         * Called when a node changes or is deleted
+         *
+         * @param oldNode the old node or null
+         * @param node the new node or null
+         */
+        void nodeChanged(ChildData oldNode, ChildData node);
+    }
+
+    /**
+     * Utility - returns a CuratorCacheListener that calls the given listener for when a node
+     * changes
+     *
+     * @param listener the listener
+     * @return a CuratorCacheListener
+     */
+    static CuratorCacheListener forNodeChanges(ChangeListener listener)
+    {
+        return new CuratorCacheListener()
+        {
+            @Override
+            public void nodeChanged(ChildData oldNode, ChildData node)
+            {
+                listener.nodeChanged(oldNode, node);
+            }
+        };
+    }
+
+    /**
+     * Utility - returns a CuratorCacheListener that calls the given listener when a node
+     * is created
+     *
+     * @param listener the listener
+     * @return a CuratorCacheListener
+     */
+    static CuratorCacheListener forNodeCreates(Consumer<ChildData> listener)
+    {
+        return new CuratorCacheListener()
+        {
+            @Override
+            public void nodeCreated(ChildData node)
+            {
+                listener.accept(node);
+            }
+        };
+    }
+
+    /**
+     * Utility - returns a CuratorCacheListener that calls the given listener for either
+     * new nodes or changed nodes
+     *
+     * @param listener the listener
+     * @return a CuratorCacheListener
+     */
+    static CuratorCacheListener forNodeCreatesChanges(ChangeListener listener)
+    {
+        return new CuratorCacheListener()
+        {
+            @Override
+            public void nodeCreated(ChildData node)
+            {
+                listener.nodeChanged(null, node);
+            }
+
+            @Override
+            public void nodeChanged(ChildData oldNode, ChildData node)
+            {
+                listener.nodeChanged(oldNode, node);
+            }
+        };
+    }
+
+    /**
+     * Utility - returns a CuratorCacheListener that calls the given listener when a node
+     * is deleted
+     *
+     * @param listener the listener
+     * @return a CuratorCacheListener
+     */
+    static CuratorCacheListener forNodeDeletes(Consumer<ChildData> listener)
+    {
+        return new CuratorCacheListener()
+        {
+            @Override
+            public void nodeDeleted(ChildData oldNode)
+            {
+                listener.accept(oldNode);
+            }
+        };
+    }
+
+    /**
+     * Utility - returns a CuratorCacheListener that calls the given listener for all events.
+     * Some may prefer a lambda-style interface over individual methods for each event. If
+     * both {@code oldNode} and {@code node} are non-null, it is a change. If only {@code oldNode}
+     * is null it is a creation. If only {@code node} is null it is a deletion.
+     *
+     * @param listener the listener
+     * @return a CuratorCacheListener
+     */
+    static CuratorCacheListener forAll(BiConsumer<ChildData, ChildData> listener)
+    {
+        return new CuratorCacheListener()
+        {
+            @Override
+            public void nodeDeleted(ChildData oldNode)
+            {
+                listener.accept(oldNode, null);
+            }
+
+            @Override
+            public void nodeCreated(ChildData node)
+            {
+                listener.accept(null, node);
+            }
+
+            @Override
+            public void nodeChanged(ChildData oldNode, ChildData node)
+            {
+                listener.accept(oldNode, node);
+            }
+        };
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        return new PathChildrenCacheListenerWrapper(listener, client);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, TreeCacheListener listener)
+    {
+        return new TreeCacheListenerWrapper(client, listener);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.NodeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(NodeCacheListener listener)
+    {
+        return new NodeCacheListenerWrapper(listener);
+    }
+
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..88a8ebf
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public interface CuratorCacheStorage extends Closeable
+{
+    static CuratorCacheStorage standard() {
+        return new StandardCuratorCacheStorage(true);
+    }
+
+    static CuratorCacheStorage bytesNotCached() {
+        return new StandardCuratorCacheStorage(false);
+    }
+
+    CuratorCacheStorage doNotClearOnClose();
+
+    Optional<ChildData> put(ChildData data);
+
+    Optional<ChildData> remove(String path);
+
+    Optional<ChildData> get(String path);
+
+    boolean containsPath(String path);
+
+    int size();
+
+    Stream<ChildData> stream();
+
+    void clear();
+
+    @Override
+    default void close()
+    {
+        clear();
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9687e1b..f730f78 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -53,7 +53,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class NodeCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..0811f05
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,41 @@
+package org.apache.curator.framework.recipes.cache;
+
+public class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final NodeCacheListener listener;
+
+    public NodeCacheListenerWrapper(NodeCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeCreated(ChildData node)
+    {
+        callListener();
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldNode, ChildData node)
+    {
+        callListener();
+    }
+
+    @Override
+    public void nodeDeleted(ChildData oldNode)
+    {
+        callListener();
+    }
+
+    private void callListener()
+    {
+        try
+        {
+            listener.nodeChanged();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..eb37936 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -64,8 +64,11 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
 @SuppressWarnings("NullableProblems")
+@Deprecated
 public class PathChildrenCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..237a0e8
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+    private final PathChildrenCacheListener listener;
+    private final CuratorFramework client;
+
+    public PathChildrenCacheListenerWrapper(PathChildrenCacheListener listener, CuratorFramework client)
+    {
+        this.listener = listener;
+        this.client = client;
+    }
+
+    @Override
+    public void nodeCreated(ChildData node)
+    {
+        sendEvent(node, PathChildrenCacheEvent.Type.CHILD_ADDED);
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        sendEvent(node, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+    }
+
+    @Override
+    public void nodeDeleted(ChildData oldNode)
+    {
+        sendEvent(oldNode, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+    }
+
+    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    {
+        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..5049770
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+    private final Map<String, ChildData> dataMap;
+    private final boolean cacheBytes;
+
+    StandardCuratorCacheStorage(boolean cacheBytes)
+    {
+        this(new ConcurrentHashMap<>(), cacheBytes);
+    }
+
+    @Override
+    public CuratorCacheStorage doNotClearOnClose()
+    {
+        Map<String, ChildData> copyMap = new ConcurrentHashMap<>(dataMap);
+        return new StandardCuratorCacheStorage(copyMap, cacheBytes)
+        {
+            @Override
+            public void close()
+            {
+                // NOP
+            }
+        };
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null);
+        return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        return Optional.ofNullable(dataMap.remove(path));
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(dataMap.get(path));
+    }
+
+    @Override
+    public boolean containsPath(String path)
+    {
+        return dataMap.containsKey(path);
+    }
+
+    @Override
+    public int size()
+    {
+        return dataMap.size();
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        return dataMap.values().stream();
+    }
+
+    @Override
+    public void clear()
+    {
+        dataMap.clear();
+    }
+
+    private StandardCuratorCacheStorage(Map<String, ChildData> dataMap, boolean cacheBytes)
+    {
+        this.dataMap = dataMap;
+        this.cacheBytes = cacheBytes;
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..e321eba 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,7 +71,10 @@ import static org.apache.curator.utils.PathUtils.validatePath;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..51a02cb
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final CuratorFramework client;
+    private final TreeCacheListener listener;
+
+    public TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener)
+    {
+        this.client = client;
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeCreated(ChildData node)
+    {
+        sendEvent(node, TreeCacheEvent.Type.NODE_ADDED);
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        sendEvent(node, TreeCacheEvent.Type.NODE_UPDATED);
+    }
+
+    @Override
+    public void nodeDeleted(ChildData oldNode)
+    {
+        sendEvent(oldNode, TreeCacheEvent.Type.NODE_REMOVED);
+    }
+
+    private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+    {
+        TreeCacheEvent event = new TreeCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 2c97490..40174fa 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
@@ -43,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
      private final Logger log = LoggerFactory.getLogger(getClass());
      private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
      private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+     private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard();
      private final ConnectionStateListener connectionStateListener = (client, newState) -> {
          if ( newState.isConnected() )
          {
@@ -115,13 +134,29 @@ import java.util.concurrent.atomic.AtomicReference;
          return listeners;
      }
 
+     /**
+      * Listeners are called when the persistent watcher has been successfully registered
+      * or re-registered after a connection disruption
+      *
+      * @return listener container
+      */
+     public StandardListenerManager<Runnable> getResetListenable()
+     {
+         return resetListeners;
+     }
+
      private void reset()
      {
          try
          {
              BackgroundCallback callback = (__, event) -> {
-                 if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) {
-                     client.runSafe(this::reset);
+                 if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+                 {
+                     reset();
+                 }
+                 else
+                 {
+                     resetListeners.forEach(Runnable::run);
                  }
              };
              client.addWatch().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..c451118
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,93 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try ( TestingCluster cluster = new TestingCluster(3) )
+        {
+            cluster.start();
+
+            try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test", RECURSIVE))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    CuratorCacheListener listener = CuratorCacheListener.forNodeCreatesChanges((__, ___) -> latch.countDown());
+                    cache.listenable().addListener(listener);
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
+    {
+        CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            try (CuratorCache cache = CuratorCache.build(client, storage, "/test", RECURSIVE))
+            {
+                cache.listenable().addListener(CuratorCacheListener.forNodeChanges((__, ___) -> updatedLatch.countDown()));
+                cache.listenable().addListener(CuratorCacheListener.forNodeCreates(__ -> addedLatch.countDown()));
+                cache.start();
+
+                client.create().forPath("/test/foo", "first".getBytes());
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+                client.setData().forPath("/test/foo", "something new".getBytes());
+                Assert.assertTrue(timing.awaitLatch(updatedLatch));
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..e7448d9
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,301 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheStorage;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCacheConsistency extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    private static final Timing2 timing = new Timing2();
+    private static final Duration testLength = Duration.ofSeconds(30);
+    private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3);
+    private static final Duration sleepLength = Duration.ofMillis(5);
+    private static final int nodesPerLevel = 10;
+    private static final int clusterSize = 5;
+    private static final int maxServerKills = 2;
+
+    private static final String BASE_PATH = "/test";
+
+    private static class Client implements Closeable
+    {
+        private final CuratorFramework client;
+        private final CuratorCache cache;
+        private final int index;
+
+        Client(int index, String connectionString)
+        {
+            this.index = index;
+            client = buildClient(connectionString);
+            cache = CuratorCache.build(client, CuratorCacheStorage.standard().doNotClearOnClose(), BASE_PATH, RECURSIVE);
+        }
+
+        void start()
+        {
+            client.start();
+            cache.start();
+        }
+
+        @Override
+        public void close()
+        {
+            cache.close();
+            client.close();
+        }
+    }
+
+    @Test
+    public void testConsistencyAfterSimulation() throws Exception
+    {
+        int clientQty = random.nextInt(10, 20);
+        int maxDepth = random.nextInt(5, 10);
+
+        log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+        List<Client> clients = Collections.emptyList();
+        Map<String, String> actualTree;
+
+        try ( TestingCluster cluster = new TestingCluster(clusterSize) )
+        {
+            cluster.start();
+
+            initializeBasePath(cluster);
+            try
+            {
+                clients = buildClients(cluster, clientQty);
+                workLoop(cluster, clients, maxDepth);
+
+                log.info("Test complete - sleeping to allow events to complete");
+                timing.sleepABit();
+            }
+            finally
+            {
+                clients.forEach(Client::close);
+            }
+
+            actualTree = buildActual(cluster);
+        }
+
+        log.info("client qty: {}", clientQty);
+
+        Map<Integer, List<String>> errorsList = clients.stream()
+            .map(client -> findErrors(client.index, actualTree, client.cache.storage()))
+            .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        if ( !errorsList.isEmpty() )
+        {
+            log.error("{} clients had errors", errorsList.size());
+            errorsList.forEach((index, errorList) -> {
+                log.error("Client {}", index);
+                errorList.forEach(log::error);
+                log.error("");
+            });
+
+            Assert.fail("Errors found");
+        }
+    }
+
+    private Map<String, String> buildActual(TestingCluster cluster)
+    {
+        Map<String, String> actual = new HashMap<>();
+        try ( CuratorFramework client = buildClient(cluster.getConnectString()) )
+        {
+            client.start();
+            buildActual(client, actual, BASE_PATH);
+        }
+        return actual;
+    }
+
+    private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath)
+    {
+        try
+        {
+            byte[] bytes = client.getData().forPath(fromPath);
+            actual.put(fromPath, new String(bytes));
+            client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("", e);
+        }
+    }
+
+    private List<Client> buildClients(TestingCluster cluster, int clientQty)
+    {
+        return IntStream.range(0, clientQty)
+            .mapToObj(index -> new Client(index, cluster.getConnectString()))
+            .peek(Client::start)
+            .collect(Collectors.toList());
+    }
+
+    private void initializeBasePath(TestingCluster cluster) throws Exception
+    {
+        try ( CuratorFramework client = buildClient(cluster.getConnectString()) )
+        {
+            client.start();
+            client.create().forPath(BASE_PATH, "".getBytes());
+        }
+    }
+
+    private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth) throws Exception
+    {
+        Instant start = Instant.now();
+        Instant lastServerKill = Instant.now();
+        int serverKillIndex = 0;
+        while ( true )
+        {
+            Duration elapsed = Duration.between(start, Instant.now());
+            if ( elapsed.compareTo(testLength) >= 0 )
+            {
+                break;
+            }
+
+            Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now());
+            if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+            {
+                lastServerKill = Instant.now();
+                if ( serverKillIndex < maxServerKills )
+                {
+                    doKillServer(cluster, serverKillIndex++);
+                }
+            }
+
+            int thisDepth = random.nextInt(0, maxDepth);
+            String thisPath = buildPath(thisDepth);
+            CuratorFramework client = randomClient(clients);
+            if ( random.nextBoolean() )
+            {
+                doDelete(client, thisPath);
+            }
+            else
+            {
+                doChange(client, thisPath);
+            }
+
+            Thread.sleep(sleepLength.toMillis());
+        }
+    }
+
+    private void doChange(CuratorFramework client, String thisPath)
+    {
+        try
+        {
+            String thisData = Long.toString(random.nextLong());
+            client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes());
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not create/set: " + thisPath);
+        }
+    }
+
+    private void doDelete(CuratorFramework client, String thisPath)
+    {
+        if ( thisPath.equals(BASE_PATH) )
+        {
+            return;
+        }
+        try
+        {
+            client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not delete: " + thisPath);
+        }
+    }
+
+    private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception
+    {
+        log.info("Killing server {}", serverKillIndex);
+        InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+        cluster.killServer(killSpec);
+    }
+
+    private CuratorFramework randomClient(List<Client> clients)
+    {
+        return clients.get(random.nextInt(clients.size())).client;
+    }
+
+    private Map.Entry<Integer, List<String>> findErrors(int index, Map<String, String> tree, CuratorCacheStorage storage)
+    {
+        List<String> errors = new ArrayList<>();
+        if ( tree.size() != storage.size() )
+        {
+            errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size()));
+        }
+        tree.keySet().forEach(path -> {
+            if ( !storage.containsPath(path) )
+            {
+                errors.add(String.format("Path %s in master but not client", path));
+            }
+        });
+        storage.stream().forEach(data -> {
+            String treeValue = tree.get(data.getPath());
+            if ( treeValue != null )
+            {
+                if ( !treeValue.equals(new String(data.getData())) )
+                {
+                    errors.add(String.format("Data at %s is not the same", data.getPath()));
+                }
+            }
+            else
+            {
+                errors.add(String.format("Path %s in client but not master", data.getPath()));
+            }
+        });
+        return new AbstractMap.SimpleEntry<>(index, errors);
+    }
+
+    private String buildPath(int depth)
+    {
+        StringBuilder str = new StringBuilder(BASE_PATH);
+        while ( depth-- > 0 )
+        {
+            int levelNodeName = random.nextInt(nodesPerLevel);
+            str.append("/").append(levelNodeName);
+        }
+        return str.toString();
+    }
+
+    @Override
+    protected void createServer()
+    {
+        // do nothing
+    }
+
+    private static CuratorFramework buildClient(String connectionString)
+    {
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100);
+        return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy);
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..d94318b
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,147 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestWrappedNodeCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes());
+
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCache.build(client, "/test/foo", RECURSIVE);
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "one".getBytes());
+
+            client.delete().forPath("/test/foo");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            client.create().forPath("/test/foo", "two".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "two".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes());
+
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            });
+
+            cache = CuratorCache.build(client,"/test/node", RECURSIVE);
+
+            Semaphore latch = new Semaphore(0);
+            NodeCacheListener listener = latch::release;
+            cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "start".getBytes());
+
+            client.setData().forPath("/test/node", "new data".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "new data".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCache.build(client, "/test/node", RECURSIVE);
+            cache.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            Assert.assertNull(cache.getRootData().orElse(null));
+
+            client.create().forPath("/test/node", "a".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), "a".getBytes());
+
+            client.setData().forPath("/test/node", "b".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), "b".getBytes());
+
+            client.delete().forPath("/test/node");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertNull(cache.getRootData().orElse(null));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..b3d5701 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -6,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
@@ -14,6 +33,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestPersistentWatcher extends BaseClassForTests
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
      */
     public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception
     {
-        Method              m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+        Method              m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
         m.setAccessible(true);
         InetSocketAddress   address = (InetSocketAddress)m.invoke(client);
         if ( address != null )
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *   http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied.  See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
- {
-     private final CuratorFrameworkImpl client;
-     private final Filters filters;
-     private Watching watching = null;
-     private boolean recursive = false;
-
-     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
-     {
-         this.client = client;
-         this.filters = filters;
-     }
-
-     @Override
-     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
-     {
-         recursive = true;
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncStage<Void> forPath(String path)
-     {
-         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
-         AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
-         return safeCall(common.internalCallback, () -> builder.forPath(path));
-     }
- }
\ No newline at end of file