You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/08 06:39:43 UTC

[curator] branch persistent-watcher-cache updated (0210ab7 -> f1f8719)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch persistent-watcher-cache
in repository https://gitbox.apache.org/repos/asf/curator.git.


 discard 0210ab7  wip
     new f1f8719  wip

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (0210ab7)
            \
             N -- N -- N   refs/heads/persistent-watcher-cache (f1f8719)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../recipes/cache/TestCuratorCacheWrappers.java    | 41 +++++++++++++++++++---
 1 file changed, 37 insertions(+), 4 deletions(-)


[curator] 01/01: wip

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-cache
in repository https://gitbox.apache.org/repos/asf/curator.git

commit f1f8719942ff1bfe0a29573b71f714ea638e46b4
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 7 15:48:57 2019 +0300

    wip
---
 .../framework/recipes/cache/CuratorCache.java      | 141 ++++++++
 .../framework/recipes/cache/CuratorCacheImpl.java  | 273 +++++++++++++++
 .../recipes/cache/CuratorCacheListener.java        | 236 +++++++++++++
 .../recipes/cache/CuratorCacheStorage.java         | 140 ++++++++
 .../curator/framework/recipes/cache/NodeCache.java |   3 +
 .../recipes/cache/NodeCacheListenerWrapper.java    |  47 +++
 .../framework/recipes/cache/PathChildrenCache.java |   3 +
 .../cache/PathChildrenCacheListenerWrapper.java    |  71 ++++
 .../recipes/cache/StandardCuratorCacheStorage.java | 111 ++++++
 .../curator/framework/recipes/cache/TreeCache.java |   3 +
 .../recipes/cache/TreeCacheListenerWrapper.java    |  71 ++++
 .../framework/recipes/watch/PersistentWatcher.java |  39 ++-
 .../framework/recipes/cache/TestCuratorCache.java  | 110 ++++++
 .../recipes/cache/TestCuratorCacheConsistency.java | 384 +++++++++++++++++++++
 .../cache/TestCuratorCacheEventOrdering.java       |  49 +++
 .../recipes/cache/TestCuratorCacheWrappers.java    | 147 ++++++++
 .../recipes/cache/TestWrappedNodeCache.java        | 165 +++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  20 ++
 .../org/apache/curator/test/TestingCluster.java    |   2 +-
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  18 +
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 ----
 21 files changed, 2030 insertions(+), 78 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..44465c8
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * <p>
+ *     A utility that attempts to keep the data from a node locally cached. Optionally the entire
+ *     tree of children below the node can also be cached. Will respond to update/create/delete events, pull
+ *     down the data, etc. You can register listeners that will get notified when changes occur.
+ * </p>
+ *
+ * <p>
+ *     <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+ *     be prepared for false-positives and false-negatives. Additionally, always use the version number
+ *     when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * cache build options
+     */
+    enum Options
+    {
+        /**
+         * Cache the entire tree of nodes starting at the given node
+         */
+        RECURSIVE,
+
+        /**
+         * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+         */
+        COMPRESSED_DATA
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options using a standard storage instance
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, String path, Options... options)
+    {
+        return build(client, CuratorCacheStorage.standard(), path, options);
+    }
+
+    /**
+     * Return a Curator cache for the given path with the given options and the given storage instance
+     *
+     * @param client Curator client
+     * @param storage storage to use
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, CuratorCacheStorage storage, String path, Options... options)
+    {
+        return new CuratorCacheImpl(client, storage, path, options);
+    }
+
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc. Note: also calls {@link CuratorCacheStorage#close()}
+     */
+    @Override
+    void close();
+
+    /**
+     * Utility to force a rebuild of the cache. Normally, this should not ever be needed
+     */
+    void forceRebuild();
+
+    /**
+     * Return the storage instance being used
+     *
+     * @return storage
+     */
+    CuratorCacheStorage storage();
+
+    /**
+     * Return the root node being cached (i.e. the node passed to the builder)
+     *
+     * @return root node path
+     */
+    String getRootPath();
+
+    /**
+     * Convenience to return the root node data
+     *
+     * @return data (if it's in the cache)
+     */
+    default Optional<ChildData> getRootData()
+    {
+        return storage().get(getRootPath());
+    }
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+
+    /**
+     * By default any unexpected exception is handled by logging the exception. You can change
+     * so that a handler is called instead. Under normal circumstances, this shouldn't be necessary.
+     *
+     * @param newHandler new exception handler or {@code null} to reset to the default logging
+     */
+    void setExceptionHandler(Consumer<Exception> newHandler);
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..a31f841
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final PersistentWatcher persistentWatcher;
+    private final CuratorFramework client;
+    private final CuratorCacheStorage storage;
+    private final String path;
+    private final boolean recursive;
+    private final boolean compressedData;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+    private volatile Consumer<Exception> exceptionHandler;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options... optionsArg)
+    {
+        Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.client = client;
+        this.storage = storage;
+        this.path = path;
+        this.recursive = options.contains(Options.RECURSIVE);
+        this.compressedData = options.contains(Options.COMPRESSED_DATA);
+        persistentWatcher = new PersistentWatcher(client, path, recursive);
+        persistentWatcher.getListenable().addListener(this::processEvent);
+        persistentWatcher.getResetListenable().addListener(this::forceRebuild);
+        setExceptionHandler(null);
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        persistentWatcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            persistentWatcher.close();
+            storage.close();
+        }
+    }
+
+    @Override
+    public void forceRebuild()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        nodeChanged(path);
+        storage.stream()
+            .map(ChildData::getPath)
+            .filter(p -> !p.equals(path))
+            .forEach(this::nodeChanged);
+    }
+
+    @Override
+    public String getRootPath()
+    {
+        return path;
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return storage;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public void setExceptionHandler(Consumer<Exception> newHandler)
+    {
+        this.exceptionHandler = (newHandler != null) ? newHandler : e -> log.error("CuratorCache error", e);
+    }
+
+    private void processEvent(WatchedEvent event)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        switch ( event.getType() )
+        {
+            case NodeDataChanged:
+            case NodeCreated:
+            {
+                nodeChanged(event.getPath());
+                break;
+            }
+
+            case NodeDeleted:
+            {
+                removeStorage(event.getPath());
+                break;
+            }
+
+            case NodeChildrenChanged:
+            {
+                nodeChildrenChanged(event.getPath());
+                break;
+            }
+        }
+    }
+
+    private void nodeChildrenChanged(String fromPath)
+    {
+        if ( (state.get() != State.STARTED) || !recursive )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child)));
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+            };
+            client.getChildren().inBackground(callback).forPath(fromPath);
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void nodeChanged(String fromPath)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    nodeChildrenChanged(event.getPath());
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+            };
+            if ( compressedData )
+            {
+                client.getData().decompressed().inBackground(callback).forPath(fromPath);
+            }
+            else
+            {
+                client.getData().inBackground(callback).forPath(fromPath);
+            }
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void putStorage(ChildData data)
+    {
+        Optional<ChildData> previousData = storage.put(data);
+        if ( previousData.isPresent() )
+        {
+            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            {
+                callListeners(l -> l.nodeChanged(previousData.get(), data));
+            }
+        }
+        else
+        {
+            callListeners(l -> l.nodeChanged(null, data));
+        }
+    }
+
+    private void removeStorage(String path)
+    {
+        storage.remove(path).ifPresent(previousData -> callListeners(l -> l.nodeChanged(previousData, null)));
+    }
+
+    private void callListeners(Consumer<CuratorCacheListener> proc)
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.runSafe(() -> listenerManager.forEach(proc));
+        }
+    }
+
+    private void handleException(CuratorEvent event)
+    {
+        handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
+    }
+
+    private void handleException(Exception e)
+    {
+        ThreadUtils.checkInterrupted(e);
+        exceptionHandler.accept(e);
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..cd9b66d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.function.Consumer;
+
+/**
+ * Listener for {@link CuratorCache} events. The main functional interface is general purpose
+ * but various event specific modifiers are provided. See the doc for each one for details
+ */
+@SuppressWarnings("deprecation")
+@FunctionalInterface
+public interface CuratorCacheListener
+{
+    /**
+     * <p>
+     *     Called when a node is created, changed or deleted.
+     *     The general contract is that the type of change is implied by the arguments. If
+     *     both {@code oldNode} and {@code node} are non-null, this is an existing node in the cache
+     *     that has changed. If only {@code oldNode} is non-null, this is an existing node that is
+     *     being removed from the cache. If only {@code node} is non-null, this is a new node. Note:
+     *     {@code oldNode} and {@code node} will never both be null.
+     * </p>
+     *
+     * <p>
+     *     You can get a descriptive
+     *     type for the operation by calling {@link org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type#get(ChildData, ChildData)}
+     *     with the {@code oldNode} and {@code node} arguments
+     * </p>
+     *
+     * @param oldNode the old node or null
+     * @param node the new node or null
+     */
+    void nodeChanged(ChildData oldNode, ChildData node);
+
+    /**
+     * An enumerated type that describes a change
+     */
+    enum Type
+    {
+        /**
+         * A new node was added to the cache
+         */
+        NODE_CREATED,
+
+        /**
+         * A node already in the cache has changed
+         */
+        NODE_CHANGED,
+
+        /**
+         * A node already in the cache was deleted
+         */
+        NODE_DELETED
+        ;
+
+        /**
+         * Given arguments to {@link org.apache.curator.framework.recipes.cache.CuratorCacheListener#nodeChanged(ChildData, ChildData)}
+         * return the enumerated event type
+         *
+         * @param oldNode the old node or null
+         * @param node the main node or null
+         * @return event type
+         * @throws IllegalArgumentException if both oldNode and node are null
+         */
+        public static Type get(ChildData oldNode, ChildData node)
+        {
+            if ( (oldNode != null) && (node != null) )
+            {
+                return NODE_CHANGED;
+            }
+            else if ( oldNode != null )
+            {
+                return NODE_DELETED;
+            }
+            else if ( node != null )
+            {
+                return NODE_CREATED;
+            }
+            throw new IllegalArgumentException("oldNode and node cannot both be null");
+        }
+    }
+
+    /**
+     * Return a listener that only responds to {@link Type#NODE_CREATED} events
+     *
+     * @param listener listener to wrap
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forCreates(Consumer<ChildData> listener)
+    {
+        return (oldNode, node) -> {
+            if ( Type.get(oldNode, node) == Type.NODE_CREATED )
+            {
+                listener.accept(node);
+            }
+        };
+    }
+
+    /**
+     * Return a listener that only responds to {@link Type#NODE_CHANGED} events
+     *
+     * @param listener listener to wrap
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forChanges(CuratorCacheListener listener)
+    {
+        return (oldNode, node) -> {
+            if ( Type.get(oldNode, node) == Type.NODE_CHANGED )
+            {
+                listener.nodeChanged(oldNode, node);
+            }
+        };
+    }
+
+    /**
+     * Return a listener that only responds to {@link Type#NODE_DELETED} events
+     *
+     * @param listener listener to wrap
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forDeletes(Consumer<ChildData> listener)
+    {
+        return (oldNode, node) -> {
+            if ( Type.get(oldNode, node) == Type.NODE_DELETED )
+            {
+                listener.accept(oldNode);
+            }
+        };
+    }
+
+    /**
+     * Return a listener that forwards all event types to the non-null provided listeners.
+     *
+     * @param createsListener if not null, called for {@link Type#NODE_CREATED} events
+     * @param changesListener if not null, called for {@link Type#NODE_CHANGED} events
+     * @param deletesListener if not null, called for {@link Type#NODE_DELETED} events
+     * @return wrapped listener
+     */
+    static CuratorCacheListener forAll(Consumer<ChildData> createsListener, CuratorCacheListener changesListener, Consumer<ChildData> deletesListener)
+    {
+        return (oldNode, node) -> {
+            switch ( Type.get(oldNode, node) )
+            {
+                case NODE_CREATED:
+                {
+                    if ( createsListener != null )
+                    {
+                        createsListener.accept(node);
+                    }
+                    break;
+                }
+
+                case NODE_CHANGED:
+                {
+                    if ( changesListener != null )
+                    {
+                        changesListener.nodeChanged(oldNode, node);
+                    }
+                    break;
+                }
+
+                case NODE_DELETED:
+                {
+                    if ( deletesListener != null )
+                    {
+                        deletesListener.accept(oldNode);
+                    }
+                    break;
+                }
+            }
+        };
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        return new PathChildrenCacheListenerWrapper(listener, client);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(CuratorFramework client, TreeCacheListener listener)
+    {
+        return new TreeCacheListenerWrapper(client, listener);
+    }
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+     * with CuratorCache.
+     *
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    static CuratorCacheListener wrap(NodeCacheListener listener)
+    {
+        return new NodeCacheListenerWrapper(listener);
+    }
+
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..1044a4a
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.io.Closeable;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Interface for maintaining data in a {@link org.apache.curator.framework.recipes.cache.CuratorCache}
+ */
+public interface CuratorCacheStorage extends Closeable
+{
+    /**
+     * Return a new standard storage instance
+     *
+     * @return storage instance
+     */
+    static CuratorCacheStorage standard() {
+        return new StandardCuratorCacheStorage(true);
+    }
+
+    /**
+     * Return a new storage instance that does not retain the data bytes. i.e. ChildData objects
+     * returned by this storage will always return {@code null} for {@link ChildData#getData()}.
+     *
+     * @return storage instance that does not retain data bytes
+     */
+    static CuratorCacheStorage bytesNotCached() {
+        return new StandardCuratorCacheStorage(false);
+    }
+
+    /**
+     * Returns a new copy of this storage that does not clear its internal data when it is closed.
+     * Useful for retaining the cache after it is closed.
+     *
+     * @return new copy that does not clear on close
+     */
+    CuratorCacheStorage doNotClearOnClose();
+
+    /**
+     * Add an entry to storage and return any previous entry at that path
+     *
+     * @param data entry to add
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> put(ChildData data);
+
+    /**
+     * Remove the entry from storage and return any previous entry at that path
+     *
+     * @param path path to remove
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> remove(String path);
+
+    /**
+     * Return an entry from storage
+     *
+     * @param path path to get
+     * @return entry or {@code empty()}
+     */
+    Optional<ChildData> get(String path);
+
+    /**
+     * Return true if the storage currently has an entry for the given path
+     *
+     * @param path path to check
+     * @return true/false
+     */
+    boolean containsPath(String path);
+
+    /**
+     * Return the current number of entries in storage
+     *
+     * @return number of entries
+     */
+    int size();
+
+    /**
+     * Return a stream over the storage entries. Note: for a standard storage instance, the stream
+     * behaves like a stream returned by {@link java.util.concurrent.ConcurrentHashMap#entrySet()}
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> stream();
+
+    /**
+     * Return a stream over the storage entries that are the immediate children of the given node.
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> streamImmediateChildren(String fromParent);
+
+    /**
+     * Utility - given a stream of child nodes, build a map. Note: it is assumed that each child
+     * data in the stream has a unique path
+     *
+     * @param stream stream of child nodes with unique paths
+     * @return map
+     */
+    static Map<String, ChildData> toMap(Stream<ChildData> stream)
+    {
+        return stream.map(data -> new AbstractMap.SimpleEntry<>(data.getPath(), data))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    /**
+     * Reset the storage to zero entries
+     */
+    void clear();
+
+    /**
+     * Close the storage. For a standard storage instance, the storage is cleared.
+     */
+    @Override
+    default void close()
+    {
+        clear();
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9687e1b..f730f78 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -53,7 +53,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class NodeCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..89360aa
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+public class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final NodeCacheListener listener;
+
+    public NodeCacheListenerWrapper(NodeCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldNode, ChildData node)
+    {
+        callListener();
+    }
+
+    private void callListener()
+    {
+        try
+        {
+            listener.nodeChanged();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..eb37936 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -64,8 +64,11 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
 @SuppressWarnings("NullableProblems")
+@Deprecated
 public class PathChildrenCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..0393204
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+    private final PathChildrenCacheListener listener;
+    private final CuratorFramework client;
+
+    public PathChildrenCacheListenerWrapper(PathChildrenCacheListener listener, CuratorFramework client)
+    {
+        this.listener = listener;
+        this.client = client;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        switch ( CuratorCacheListener.Type.get(oldData, node) )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(node, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(node, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                break;
+            }
+        }
+    }
+
+    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    {
+        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..d92eddf
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.utils.ZKPaths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+    private final Map<String, ChildData> dataMap;
+    private final boolean cacheBytes;
+
+    StandardCuratorCacheStorage(boolean cacheBytes)
+    {
+        this(new ConcurrentHashMap<>(), cacheBytes);
+    }
+
+    @Override
+    public CuratorCacheStorage doNotClearOnClose()
+    {
+        Map<String, ChildData> copyMap = new ConcurrentHashMap<>(dataMap);
+        return new StandardCuratorCacheStorage(copyMap, cacheBytes)
+        {
+            @Override
+            public void close()
+            {
+                // NOP
+            }
+        };
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null);
+        return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        return Optional.ofNullable(dataMap.remove(path));
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(dataMap.get(path));
+    }
+
+    @Override
+    public boolean containsPath(String path)
+    {
+        return dataMap.containsKey(path);
+    }
+
+    @Override
+    public int size()
+    {
+        return dataMap.size();
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        return dataMap.values().stream();
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return dataMap.entrySet()
+            .stream()
+            .filter(entry -> {
+                ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(entry.getKey());
+                return pathAndNode.getPath().equals(fromParent);
+            })
+            .map(Map.Entry::getValue);
+    }
+
+    @Override
+    public void clear()
+    {
+        dataMap.clear();
+    }
+
+    private StandardCuratorCacheStorage(Map<String, ChildData> dataMap, boolean cacheBytes)
+    {
+        this.dataMap = dataMap;
+        this.cacheBytes = cacheBytes;
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..e321eba 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,7 +71,10 @@ import static org.apache.curator.utils.PathUtils.validatePath;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
  */
+@Deprecated
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..0c3a75b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final CuratorFramework client;
+    private final TreeCacheListener listener;
+
+    public TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener)
+    {
+        this.client = client;
+        this.listener = listener;
+    }
+
+    @Override
+    public void nodeChanged(ChildData oldData, ChildData node)
+    {
+        switch ( CuratorCacheListener.Type.get(oldData, node) )
+        {
+        case NODE_CREATED:
+        {
+            sendEvent(node, TreeCacheEvent.Type.NODE_ADDED);
+            break;
+        }
+
+        case NODE_CHANGED:
+        {
+            sendEvent(node, TreeCacheEvent.Type.NODE_UPDATED);
+            break;
+        }
+
+        case NODE_DELETED:
+        {
+            sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+            break;
+        }
+        }
+    }
+
+    private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+    {
+        TreeCacheEvent event = new TreeCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 2c97490..40174fa 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
@@ -43,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
      private final Logger log = LoggerFactory.getLogger(getClass());
      private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
      private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+     private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard();
      private final ConnectionStateListener connectionStateListener = (client, newState) -> {
          if ( newState.isConnected() )
          {
@@ -115,13 +134,29 @@ import java.util.concurrent.atomic.AtomicReference;
          return listeners;
      }
 
+     /**
+      * Listeners are called when the persistent watcher has been successfully registered
+      * or re-registered after a connection disruption
+      *
+      * @return listener container
+      */
+     public StandardListenerManager<Runnable> getResetListenable()
+     {
+         return resetListeners;
+     }
+
      private void reset()
      {
          try
          {
              BackgroundCallback callback = (__, event) -> {
-                 if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) {
-                     client.runSafe(this::reset);
+                 if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+                 {
+                     reset();
+                 }
+                 else
+                 {
+                     resetListeners.forEach(Runnable::run);
                  }
              };
              client.addWatch().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..ae7a1d8
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try ( TestingCluster cluster = new TestingCluster(3) )
+        {
+            cluster.start();
+
+            try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test", RECURSIVE))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
+    {
+        CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            try (CuratorCache cache = CuratorCache.build(client, storage, "/test", RECURSIVE))
+            {
+                cache.listenable().addListener(CuratorCacheListener.forChanges((__, ___) -> updatedLatch.countDown()));
+                cache.listenable().addListener(CuratorCacheListener.forCreates(__ -> addedLatch.countDown()));
+                cache.start();
+
+                client.create().forPath("/test/foo", "first".getBytes());
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+                client.setData().forPath("/test/foo", "something new".getBytes());
+                Assert.assertTrue(timing.awaitLatch(updatedLatch));
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..dcb1ff3
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+/**
+ * Randomly create nodes in a tree while a set of CuratorCaches listens. Afterwards, validate
+ * that the caches contain the same values as ZK itself
+ */
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCacheConsistency extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    private static final Timing2 timing = new Timing2();
+    private static final Duration testLength = Duration.ofSeconds(30);
+    private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3);
+    private static final Duration sleepLength = Duration.ofMillis(5);
+    private static final int nodesPerLevel = 10;
+    private static final int clusterSize = 5;
+    private static final int maxServerKills = 2;
+
+    private static final String BASE_PATH = "/test";
+
+    private static class Client implements Closeable
+    {
+        private final CuratorFramework client;
+        private final CuratorCache cache;
+        private final int index;
+        private final Map<String, ChildData> listenerDataMap = new HashMap<>();
+
+        Client(int index, String connectionString, AtomicReference<Exception> errorSignal)
+        {
+            this.index = index;
+            client = buildClient(connectionString);
+            cache = CuratorCache.build(client, CuratorCacheStorage.standard().doNotClearOnClose(), BASE_PATH, RECURSIVE);
+            cache.setExceptionHandler(errorSignal::set);
+
+            // listenerDataMap is a local data map that will hold values sent by listeners
+            // this way, the listener code can be tested for validity and consistency
+            cache.listenable().addListener(CuratorCacheListener.forAll(
+                // creates
+                node -> {
+                    ChildData previous = listenerDataMap.put(node.getPath(), node);
+                    if ( previous != null )
+                    {
+                        errorSignal.set(new Exception(String.format("Client: %d - Create for existing node: %s", index, node.getPath())));
+                    }
+                },
+
+                // changes
+                (oldNode, node) -> {
+                    ChildData previous = listenerDataMap.put(node.getPath(), node);
+                    if ( (previous == null) || !Arrays.equals(previous.getData(), oldNode.getData()) )
+                    {
+                        errorSignal.set(new Exception(String.format("Client: %d - Bad old value for change node: %s", index, node.getPath())));
+                    }
+                },
+
+                // deletes
+                node -> {
+                    ChildData previous = listenerDataMap.remove(node.getPath());
+                    if ( previous == null )
+                    {
+                        errorSignal.set(new Exception(String.format("Client: %d - Delete for non-existent node: %s", index, node.getPath())));
+                    }
+                }
+            ));
+        }
+
+        void start()
+        {
+            client.start();
+            cache.start();
+        }
+
+        @Override
+        public void close()
+        {
+            cache.close();
+            client.close();
+        }
+    }
+
+    @Test
+    public void testConsistencyAfterSimulation() throws Exception
+    {
+        int clientQty = random.nextInt(10, 20);
+        int maxDepth = random.nextInt(5, 10);
+
+        log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+        List<Client> clients = Collections.emptyList();
+        Map<String, String> actualTree;
+
+        AtomicReference<Exception> errorSignal = new AtomicReference<>();
+        try (TestingCluster cluster = new TestingCluster(clusterSize))
+        {
+            cluster.start();
+
+            initializeBasePath(cluster);
+            try
+            {
+                clients = buildClients(cluster, clientQty, errorSignal);
+                workLoop(cluster, clients, maxDepth, errorSignal);
+
+                log.info("Test complete - sleeping to allow events to complete");
+                timing.sleepABit();
+            }
+            finally
+            {
+                clients.forEach(Client::close);
+            }
+
+            actualTree = buildActual(cluster);
+        }
+
+        log.info("client qty: {}", clientQty);
+
+        Map<Integer, List<String>> errorsList = clients.stream()
+            .map(client ->  findErrors(client, actualTree))
+            .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        if ( !errorsList.isEmpty() )
+        {
+            log.error("{} clients had errors", errorsList.size());
+            errorsList.forEach((index, errorList) -> {
+                log.error("Client {}", index);
+                errorList.forEach(log::error);
+                log.error("");
+            });
+
+            Assert.fail("Errors found");
+        }
+    }
+
+    // build a data map recursively from the actual values in ZK
+    private Map<String, String> buildActual(TestingCluster cluster)
+    {
+        Map<String, String> actual = new HashMap<>();
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            buildActual(client, actual, BASE_PATH);
+        }
+        return actual;
+    }
+
+    private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath)
+    {
+        try
+        {
+            byte[] bytes = client.getData().forPath(fromPath);
+            actual.put(fromPath, new String(bytes));
+            client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("", e);
+        }
+    }
+
+    private List<Client> buildClients(TestingCluster cluster, int clientQty, AtomicReference<Exception> errorSignal)
+    {
+        return IntStream.range(0, clientQty)
+            .mapToObj(index -> new Client(index, cluster.getConnectString(), errorSignal))
+            .peek(Client::start)
+            .collect(Collectors.toList());
+    }
+
+    private void initializeBasePath(TestingCluster cluster) throws Exception
+    {
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            client.create().forPath(BASE_PATH, "".getBytes());
+        }
+    }
+
+    private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth, AtomicReference<Exception> errorSignal) throws Exception
+    {
+        Instant start = Instant.now();
+        Instant lastServerKill = Instant.now();
+        int serverKillIndex = 0;
+        while ( true )
+        {
+            Duration elapsed = Duration.between(start, Instant.now());
+            if ( elapsed.compareTo(testLength) >= 0 )
+            {
+                break;
+            }
+
+            Exception errorSignalException = errorSignal.get();
+            if ( errorSignalException != null )
+            {
+                Assert.fail("A client's error handler was called", errorSignalException);
+            }
+
+            Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now());
+            if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+            {
+                lastServerKill = Instant.now();
+                if ( serverKillIndex < maxServerKills )
+                {
+                    doKillServer(cluster, serverKillIndex++);
+                }
+            }
+
+            int thisDepth = random.nextInt(0, maxDepth);
+            String thisPath = randomPath(thisDepth);
+            CuratorFramework client = randomClient(clients);
+            if ( random.nextBoolean() )
+            {
+                doDelete(client, thisPath);
+            }
+            else
+            {
+                doChange(client, thisPath);
+            }
+
+            Thread.sleep(sleepLength.toMillis());
+        }
+    }
+
+    private void doChange(CuratorFramework client, String thisPath)
+    {
+        try
+        {
+            String thisData = Long.toString(random.nextLong());
+            client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes());
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not create/set: " + thisPath);
+        }
+    }
+
+    private void doDelete(CuratorFramework client, String thisPath)
+    {
+        if ( thisPath.equals(BASE_PATH) )
+        {
+            return;
+        }
+        try
+        {
+            client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not delete: " + thisPath);
+        }
+    }
+
+    private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception
+    {
+        log.info("Killing server {}", serverKillIndex);
+        InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+        cluster.killServer(killSpec);
+    }
+
+    private CuratorFramework randomClient(List<Client> clients)
+    {
+        return clients.get(random.nextInt(clients.size())).client;
+    }
+
+    private Map.Entry<Integer, List<String>> findErrors(Client client, Map<String, String> tree)
+    {
+        CuratorCacheStorage storage = client.cache.storage();
+        List<String> errors = new ArrayList<>();
+        if ( tree.size() != storage.size() )
+        {
+            errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size()));
+        }
+        tree.keySet().forEach(path -> {
+            if ( !storage.containsPath(path) )
+            {
+                errors.add(String.format("Path %s in master but not client", path));
+            }
+        });
+        storage.stream().forEach(data -> {
+            String treeValue = tree.get(data.getPath());
+            if ( treeValue != null )
+            {
+                if ( !treeValue.equals(new String(data.getData())) )
+                {
+                    errors.add(String.format("Data at %s is not the same", data.getPath()));
+                }
+
+                ChildData listenersMapData = client.listenerDataMap.get(data.getPath());
+                if ( listenersMapData == null )
+                {
+                    errors.add(String.format("listenersMap missing data at: %s", data.getPath()));
+                }
+                else if ( !treeValue.equals(new String(listenersMapData.getData())) )
+                {
+                    errors.add(String.format("Data at %s in listenersMap is not the same", data.getPath()));
+                }
+            }
+            else
+            {
+                errors.add(String.format("Path %s in client but not master", data.getPath()));
+            }
+        });
+
+        client.listenerDataMap.keySet().forEach(path -> {
+            if ( !storage.containsPath(path) )
+            {
+                errors.add(String.format("Path %s in listenersMap but not storage", path));
+            }
+        });
+
+        return new AbstractMap.SimpleEntry<>(client.index, errors);
+    }
+
+    private String randomPath(int depth)
+    {
+        StringBuilder str = new StringBuilder(BASE_PATH);
+        while ( depth-- > 0 )
+        {
+            int levelNodeName = random.nextInt(nodesPerLevel);
+            str.append("/").append(levelNodeName);
+        }
+        return str.toString();
+    }
+
+    @Override
+    protected void createServer()
+    {
+        // do nothing - we'll be using TestingCluster instead
+    }
+
+    private static CuratorFramework buildClient(String connectionString)
+    {
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100);
+        return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy);
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
new file mode 100644
index 0000000..915e6aa
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.BlockingQueue;
+
+public class TestCuratorCacheEventOrdering extends TestEventOrdering<CuratorCache>
+{
+    @Override
+    protected int getActualQty(CuratorCache cache)
+    {
+        return cache.storage().size();
+    }
+
+    @Override
+    protected CuratorCache newCache(CuratorFramework client, String path, BlockingQueue<Event> events)
+    {
+        CuratorCache cache = CuratorCache.build(client, path, CuratorCache.Options.RECURSIVE);
+        cache.listenable().addListener((oldNode, node) -> {
+            if ( (oldNode == null) && (node != null) )
+            {
+                events.add(new Event(EventType.ADDED, node.getPath()));
+            }
+            else if ( (oldNode != null) && (node == null) )
+            {
+                events.add(new Event(EventType.DELETED, oldNode.getPath()));
+            }
+        });
+        cache.start();
+        return cache;
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
new file mode 100644
index 0000000..d45f4f5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheStorage.toMap;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCacheWrappers extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testPathChildrenCache() throws Exception    // copied from TestPathChildrenCache#testBasics()
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<>();
+            try (CuratorCache cache = CuratorCache.build(client, "/test", RECURSIVE))
+            {
+                PathChildrenCacheListener listener = (__, event) -> {
+                    if ( event.getData().getPath().equals("/test/one") )
+                    {
+                        events.offer(event.getType());
+                    }
+                };
+                cache.listenable().addListener(CuratorCacheListener.wrap(client, listener));
+                cache.start();
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElse(null).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            }
+        }
+    }
+
+    @Test
+    public void testTreeCache() throws Exception    // copied from TestTreeCache#testBasics()
+    {
+        BaseTestTreeCache treeCacheBase = new BaseTestTreeCache();
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test", RECURSIVE))
+            {
+                cache.listenable().addListener(CuratorCacheListener.wrap(client, treeCacheBase.eventListener));
+                cache.start();
+
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+                // assertEvent(TreeCacheEvent.Type.INITIALIZED); CuratorCache doesn't support this
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/t").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/testing").count(), 0);
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElse(null).getData()), "hey there");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test/one")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/o").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/onely").count(), 0);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElse(null).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+            }
+        }
+    }
+
+    @Test
+    public void testNodeCache() throws Exception    // copied from TestNodeCache#testBasics()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test/node"))
+            {
+                cache.start();
+
+                final Semaphore semaphore = new Semaphore(0);
+                cache.listenable().addListener(CuratorCacheListener.wrap(semaphore::release));
+                Assert.assertFalse(cache.getRootData().isPresent());
+
+                client.create().forPath("/test/node", "a".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(cache.getRootData().orElse(null).getData(), "a".getBytes());
+
+                client.setData().forPath("/test/node", "b".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(cache.getRootData().orElse(null).getData(), "b".getBytes());
+
+                client.delete().forPath("/test/node");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertFalse(cache.getRootData().isPresent());
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..d595e3c
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestWrappedNodeCache extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes());
+
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCache.build(client, "/test/foo", RECURSIVE);
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "one".getBytes());
+
+            client.delete().forPath("/test/foo");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            client.create().forPath("/test/foo", "two".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "two".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes());
+
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            });
+
+            cache = CuratorCache.build(client,"/test/node", RECURSIVE);
+
+            Semaphore latch = new Semaphore(0);
+            NodeCacheListener listener = latch::release;
+            cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "start".getBytes());
+
+            client.setData().forPath("/test/node", "new data".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+            Assert.assertTrue(cache.getRootData().isPresent());
+            Assert.assertEquals(cache.getRootData().get().getData(), "new data".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCache.build(client, "/test/node", RECURSIVE);
+            cache.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+            Assert.assertNull(cache.getRootData().orElse(null));
+
+            client.create().forPath("/test/node", "a".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), "a".getBytes());
+
+            client.setData().forPath("/test/node", "b".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(cache.getRootData().orElse(null).getData(), "b".getBytes());
+
+            client.delete().forPath("/test/node");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertNull(cache.getRootData().orElse(null));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..b3d5701 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -6,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
@@ -14,6 +33,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+@Test(groups = Zk35MethodInterceptor.zk35Group)
 public class TestPersistentWatcher extends BaseClassForTests
 {
     private final Timing2 timing = new Timing2();
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
      */
     public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception
     {
-        Method              m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+        Method              m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
         m.setAccessible(true);
         InetSocketAddress   address = (InetSocketAddress)m.invoke(client);
         if ( address != null )
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *   http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied.  See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
- {
-     private final CuratorFrameworkImpl client;
-     private final Filters filters;
-     private Watching watching = null;
-     private boolean recursive = false;
-
-     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
-     {
-         this.client = client;
-         this.filters = filters;
-     }
-
-     @Override
-     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
-     {
-         recursive = true;
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncStage<Void> forPath(String path)
-     {
-         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
-         AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
-         return safeCall(common.internalCallback, () -> builder.forPath(path));
-     }
- }
\ No newline at end of file