You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 18:25:41 UTC

[curator] 02/02: Add recipes that use persistent watchers including new cache recipes to replace all others.

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch zk36-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 886f9ffc36edf78a51e530215db5064343bc98bb
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 12:45:21 2019 -0500

    Add recipes that use persistent watchers including new cache recipes to replace all others.
---
 .../framework/recipes/cache/CuratorCache.java      | 114 +++++++
 .../recipes/cache/CuratorCacheBuilder.java         |  63 ++++
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  74 ++++
 .../framework/recipes/cache/CuratorCacheImpl.java  | 303 +++++++++++++++++
 .../recipes/cache/CuratorCacheListener.java        |  78 +++++
 .../recipes/cache/CuratorCacheListenerBuilder.java | 129 +++++++
 .../cache/CuratorCacheListenerBuilderImpl.java     | 161 +++++++++
 .../recipes/cache/CuratorCacheStorage.java         | 114 +++++++
 .../recipes/cache/NodeCacheListenerWrapper.java    |  46 +++
 .../cache/PathChildrenCacheListenerWrapper.java    |  78 +++++
 .../recipes/cache/StandardCuratorCacheStorage.java |  88 +++++
 .../recipes/cache/TreeCacheListenerWrapper.java    |  81 +++++
 .../framework/recipes/watch/PersistentWatcher.java | 169 ++++++++++
 .../framework/recipes/cache/TestCuratorCache.java  | 248 ++++++++++++++
 .../recipes/cache/TestCuratorCacheConsistency.java | 373 +++++++++++++++++++++
 .../cache/TestCuratorCacheEventOrdering.java       |  52 +++
 .../recipes/cache/TestCuratorCacheWrappers.java    | 162 +++++++++
 .../recipes/cache/TestWrappedNodeCache.java        | 172 ++++++++++
 .../recipes/watch/TestPersistentWatcher.java       | 105 ++++++
 19 files changed, 2610 insertions(+)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..ebe06e0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+
+/**
+ * <p>
+ *     A utility that attempts to keep the data from a node locally cached. Optionally the entire
+ *     tree of children below the node can also be cached. Will respond to update/create/delete events, pull
+ *     down the data, etc. You can register listeners that will get notified when changes occur.
+ * </p>
+ *
+ * <p>
+ *     <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+ *     be prepared for false-positives and false-negatives. Additionally, always use the version number
+ *     when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * cache build options
+     */
+    enum Options
+    {
+        /**
+         * Normally the entire tree of nodes starting at the given node are cached. This option
+         * causes only the given node to be cached (i.e. a single node cache)
+         */
+        SINGLE_NODE_CACHE,
+
+        /**
+         * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+         */
+        COMPRESSED_DATA,
+
+        /**
+         * Normally, when the cache is closed via {@link CuratorCache#close()}, the storage is cleared
+         * via {@link CuratorCacheStorage#clear()}. This option prevents the storage from being cleared.
+         */
+        DO_NOT_CLEAR_ON_CLOSE
+    }
+
+    /**
+     * Return a Curator Cache for the given path with the given options using a standard storage instance
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, String path, Options... options)
+    {
+        return builder(client, path).withOptions(options).build();
+    }
+
+    /**
+     * Start a Curator Cache builder
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @return builder
+     */
+    static CuratorCacheBuilder builder(CuratorFramework client, String path)
+    {
+        return new CuratorCacheBuilderImpl(client, path);
+    }
+
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the storage instance being used
+     *
+     * @return storage
+     */
+    CuratorCacheStorage storage();
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
new file mode 100644
index 0000000..35a5f26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+public interface CuratorCacheBuilder
+{
+    /**
+     * @param options any options
+     * @return this
+     */
+    CuratorCacheBuilder withOptions(CuratorCache.Options... options);
+
+    /**
+     * Alternate storage to use. If not specified, {@link StandardCuratorCacheStorage#standard()} is used
+     *
+     * @param storage storage instance to use
+     * @return this
+     */
+    CuratorCacheBuilder withStorage(CuratorCacheStorage storage);
+
+    /**
+     * By default any unexpected exception is handled by logging the exception. You can change
+     * so that a handler is called instead. Under normal circumstances, this shouldn't be necessary.
+     *
+     * @param exceptionHandler exception handler to use
+     */
+    CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler);
+
+    /**
+     * Normally, listeners are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)}. Use this
+     * method to set a different executor.
+     *
+     * @param executor to use
+     */
+    CuratorCacheBuilder withExecutor(Executor executor);
+
+    /**
+     * Return a new Curator Cache based on the builder methods that have been called
+     *
+     * @return new Curator Cache
+     */
+    CuratorCache build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
new file mode 100644
index 0000000..9f9e03d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+class CuratorCacheBuilderImpl implements CuratorCacheBuilder
+{
+    private final CuratorFramework client;
+    private final String path;
+    private CuratorCacheStorage storage;
+    private Consumer<Exception> exceptionHandler;
+    private Executor executor;
+    private CuratorCache.Options[] options;
+
+    CuratorCacheBuilderImpl(CuratorFramework client, String path)
+    {
+        this.client = client;
+        this.path = path;
+    }
+
+    @Override
+    public CuratorCacheBuilder withOptions(CuratorCache.Options... options)
+    {
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withStorage(CuratorCacheStorage storage)
+    {
+        this.storage = storage;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler)
+    {
+        this.exceptionHandler = exceptionHandler;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withExecutor(Executor executor)
+    {
+        this.executor = executor;
+        return this;
+    }
+
+    @Override
+    public CuratorCache build()
+    {
+        return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..6349530
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final PersistentWatcher persistentWatcher;
+    private final CuratorFramework client;
+    private final CuratorCacheStorage storage;
+    private final String path;
+    private final boolean recursive;
+    private final boolean compressedData;
+    private final boolean clearOnClose;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+    private final Consumer<Exception> exceptionHandler;
+    private final Executor executor;
+
+    private volatile AtomicLong outstandingOps;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler)
+    {
+        Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.client = client;
+        this.storage = (storage != null) ? storage : CuratorCacheStorage.standard();
+        this.path = path;
+        recursive = !options.contains(Options.SINGLE_NODE_CACHE);
+        compressedData = options.contains(Options.COMPRESSED_DATA);
+        clearOnClose = !options.contains(Options.DO_NOT_CLEAR_ON_CLOSE);
+        persistentWatcher = new PersistentWatcher(client, path, recursive);
+        persistentWatcher.getListenable().addListener(this::processEvent);
+        persistentWatcher.getResetListenable().addListener(this::rebuild);
+        this.exceptionHandler = (exceptionHandler != null) ? exceptionHandler : e -> log.error("CuratorCache error", e);
+        this.executor = (executor != null) ? executor : client::runSafe;
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        outstandingOps = new AtomicLong(0);
+        persistentWatcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            persistentWatcher.close();
+            if ( clearOnClose )
+            {
+                storage.clear();
+            }
+        }
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return storage;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    private void rebuild()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        nodeChanged(path);
+        storage.stream()
+            .map(ChildData::getPath)
+            .filter(p -> !p.equals(path))
+            .forEach(this::nodeChanged);
+    }
+
+    private void processEvent(WatchedEvent event)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        switch ( event.getType() )
+        {
+        case NodeDataChanged:
+        case NodeCreated:
+        {
+            nodeChanged(event.getPath());
+            break;
+        }
+
+        case NodeDeleted:
+        {
+            removeStorage(event.getPath());
+            break;
+        }
+
+        case NodeChildrenChanged:
+        {
+            nodeChildrenChanged(event.getPath());
+            break;
+        }
+        }
+    }
+
+    private void nodeChildrenChanged(String fromPath)
+    {
+        if ( (state.get() != State.STARTED) || !recursive )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child)));
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+                checkDecrementOutstandingOps();
+            };
+
+            checkIncrementOutstandingOps();
+            client.getChildren().inBackground(callback).forPath(fromPath);
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void nodeChanged(String fromPath)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    nodeChildrenChanged(event.getPath());
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+                checkDecrementOutstandingOps();
+            };
+
+            checkIncrementOutstandingOps();
+            if ( compressedData )
+            {
+                client.getData().decompressed().inBackground(callback).forPath(fromPath);
+            }
+            else
+            {
+                client.getData().inBackground(callback).forPath(fromPath);
+            }
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    public static volatile boolean JZTEST = false;
+
+    private void putStorage(ChildData data)
+    {
+        Optional<ChildData> previousData = storage.put(data);
+        if ( previousData.isPresent() )
+        {
+            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            {
+                callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
+            }
+        }
+        else
+        {
+            callListeners(l -> l.event(NODE_CREATED, null, data));
+        }
+    }
+
+    private void removeStorage(String path)
+    {
+        storage.remove(path).ifPresent(previousData -> callListeners(l -> l.event(NODE_DELETED, previousData, null)));
+    }
+
+    private void callListeners(Consumer<CuratorCacheListener> proc)
+    {
+        if ( state.get() == State.STARTED )
+        {
+            executor.execute(() -> listenerManager.forEach(proc));
+        }
+    }
+
+    private void handleException(CuratorEvent event)
+    {
+        handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
+    }
+
+    private void handleException(Exception e)
+    {
+        ThreadUtils.checkInterrupted(e);
+        exceptionHandler.accept(e);
+    }
+
+    private void checkIncrementOutstandingOps()
+    {
+        AtomicLong localOutstandingOps = outstandingOps;
+        if ( localOutstandingOps != null )
+        {
+            localOutstandingOps.incrementAndGet();
+        }
+    }
+
+    private void checkDecrementOutstandingOps()
+    {
+        AtomicLong localOutstandingOps = outstandingOps;
+        if ( localOutstandingOps != null )
+        {
+            if ( localOutstandingOps.decrementAndGet() == 0 )
+            {
+                outstandingOps = null;
+                callListeners(CuratorCacheListener::initialized);
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..620e471
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+/**
+ * Listener for {@link CuratorCache} events. The main functional interface is general purpose
+ * but you can build event specific listeners, etc. using the builder. Note: all listeners
+ * are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)} when called.
+ */
+@FunctionalInterface
+public interface CuratorCacheListener
+{
+    /**
+     * An enumerated type that describes a change
+     */
+    enum Type
+    {
+        /**
+         * A new node was added to the cache
+         */
+        NODE_CREATED,
+
+        /**
+         * A node already in the cache has changed
+         */
+        NODE_CHANGED,
+
+        /**
+         * A node already in the cache was deleted
+         */
+        NODE_DELETED
+    }
+
+    /**
+     * Called when a data is created, changed or deleted.
+     *
+     * @param type the type of event
+     * @param oldData the old data or null
+     * @param data the new data or null
+     */
+    void event(Type type, ChildData oldData, ChildData data);
+
+    /**
+     * When the cache is started, the initial nodes are tracked and when they are finished loading
+     * into the cache this method is called.
+     */
+    default void initialized()
+    {
+        // NOP
+    }
+
+    /**
+     * Returns a builder allowing type specific, and special purpose listeners.
+     *
+     * @return builder
+     */
+    static CuratorCacheListenerBuilder builder()
+    {
+        return new CuratorCacheListenerBuilderImpl();
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
new file mode 100644
index 0000000..c57e881
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
+import java.util.function.Consumer;
+
+public interface CuratorCacheListenerBuilder
+{
+    /**
+     * Add a standard listener
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forAll(CuratorCacheListener listener);
+
+    /**
+     * Add a listener only for {@link Type#NODE_CREATED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener);
+
+    @FunctionalInterface
+    interface ChangeListener
+    {
+        void event(ChildData oldNode, ChildData node);
+    }
+
+    /**
+     * Add a listener only for {@link Type#NODE_CHANGED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forChanges(ChangeListener listener);
+
+    /**
+     * Add a listener only both {@link Type#NODE_CREATED} and {@link Type#NODE_CHANGED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener);
+
+    /**
+     * Add a listener only for {@link Type#NODE_DELETED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener);
+
+    /**
+     * Add a listener only for {@link CuratorCacheListener#initialized()}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forInitialized(Runnable listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+     * with CuratorCache.
+     *
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener);
+
+    /**
+     * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called.
+     * i.e. changes that occur as the cache is initializing are not sent to the listener
+     */
+    CuratorCacheListenerBuilder afterInitialized();
+
+    /**
+     * Build and return a new listener based on the methods that have been previously called
+     *
+     * @return new listener
+     */
+    CuratorCacheListener build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
new file mode 100644
index 0000000..4873868
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
+{
+    private final List<CuratorCacheListener> listeners = new ArrayList<>();
+    private boolean afterInitializedOnly = false;
+
+    @Override
+    public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener)
+    {
+        listeners.add(listener);
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CREATED )
+            {
+                listener.accept(node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forChanges(ChangeListener listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CHANGED )
+            {
+                listener.event(oldNode, node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( (type == CuratorCacheListener.Type.NODE_CHANGED) || (type == CuratorCacheListener.Type.NODE_CREATED) )
+            {
+                listener.event(oldNode, node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_DELETED )
+            {
+                listener.accept(oldNode);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forInitialized(Runnable listener)
+    {
+        CuratorCacheListener localListener = new CuratorCacheListener()
+        {
+            @Override
+            public void event(Type type, ChildData oldData, ChildData data)
+            {
+                // NOP
+            }
+
+            @Override
+            public void initialized()
+            {
+                listener.run();
+            }
+        };
+        listeners.add(localListener);
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener)
+    {
+        listeners.add(new TreeCacheListenerWrapper(client, listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener)
+    {
+        listeners.add(new NodeCacheListenerWrapper(listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder afterInitialized()
+    {
+        afterInitializedOnly = true;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListener build()
+    {
+        List<CuratorCacheListener> copy = new ArrayList<>(listeners);
+        return new CuratorCacheListener()
+        {
+            private volatile boolean isInitialized = !afterInitializedOnly;
+
+            @Override
+            public void event(Type type, ChildData oldData, ChildData data)
+            {
+                if ( isInitialized )
+                {
+                    copy.forEach(l -> l.event(type, oldData, data));
+                }
+            }
+
+            @Override
+            public void initialized()
+            {
+                isInitialized = true;
+                copy.forEach(CuratorCacheListener::initialized);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..f3d870d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Interface for maintaining data in a {@link CuratorCache}
+ */
+public interface CuratorCacheStorage
+{
+    /**
+     * Return a new standard storage instance
+     *
+     * @return storage instance
+     */
+    static CuratorCacheStorage standard() {
+        return new StandardCuratorCacheStorage(true);
+    }
+
+    /**
+     * Return a new storage instance that does not retain the data bytes. i.e. ChildData objects
+     * returned by this storage will always return {@code null} for {@link ChildData#getData()}.
+     *
+     * @return storage instance that does not retain data bytes
+     */
+    static CuratorCacheStorage bytesNotCached() {
+        return new StandardCuratorCacheStorage(false);
+    }
+
+    /**
+     * Add an entry to storage and return any previous entry at that path
+     *
+     * @param data entry to add
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> put(ChildData data);
+
+    /**
+     * Remove the entry from storage and return any previous entry at that path
+     *
+     * @param path path to remove
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> remove(String path);
+
+    /**
+     * Return an entry from storage
+     *
+     * @param path path to get
+     * @return entry or {@code empty()}
+     */
+    Optional<ChildData> get(String path);
+
+    /**
+     * Return the current number of entries in storage
+     *
+     * @return number of entries
+     */
+    int size();
+
+    /**
+     * Return a stream over the storage entries. Note: for a standard storage instance, the stream
+     * behaves like a stream returned by {@link java.util.concurrent.ConcurrentHashMap#entrySet()}
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> stream();
+
+    /**
+     * Return a stream over the storage entries that are the immediate children of the given node.
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> streamImmediateChildren(String fromParent);
+
+    /**
+     * Utility - given a stream of child nodes, build a map. Note: it is assumed that each child
+     * data in the stream has a unique path
+     *
+     * @param stream stream of child nodes with unique paths
+     * @return map
+     */
+    static Map<String, ChildData> toMap(Stream<ChildData> stream)
+    {
+        return stream.map(data -> new AbstractMap.SimpleEntry<>(data.getPath(), data))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    /**
+     * Reset the storage to zero entries
+     */
+    void clear();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..468049b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+
+class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final NodeCacheListener listener;
+
+    NodeCacheListenerWrapper(NodeCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        try
+        {
+            listener.nodeChanged();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..a9123c1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+    private final PathChildrenCacheListener listener;
+    private final CuratorFramework client;
+
+    PathChildrenCacheListenerWrapper(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        this.listener = listener;
+        this.client = client;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        switch ( type )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void initialized()
+    {
+        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+    }
+
+    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    {
+        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..bbdb21d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.utils.ZKPaths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+    private final Map<String, ChildData> dataMap;
+    private final boolean cacheBytes;
+
+    StandardCuratorCacheStorage(boolean cacheBytes)
+    {
+        this.dataMap = new ConcurrentHashMap<>();
+        this.cacheBytes = cacheBytes;
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null);
+        return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        return Optional.ofNullable(dataMap.remove(path));
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(dataMap.get(path));
+    }
+
+    @Override
+    public int size()
+    {
+        return dataMap.size();
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        return dataMap.values().stream();
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return dataMap.entrySet()
+            .stream()
+            .filter(entry -> {
+                ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(entry.getKey());
+                return pathAndNode.getPath().equals(fromParent);
+            })
+            .map(Map.Entry::getValue);
+    }
+
+    @Override
+    public void clear()
+    {
+        dataMap.clear();
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..570799b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final CuratorFramework client;
+    private final TreeCacheListener listener;
+
+    TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener)
+    {
+        this.client = client;
+        this.listener = listener;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        switch ( type )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(data, TreeCacheEvent.Type.NODE_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(data, TreeCacheEvent.Type.NODE_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void initialized()
+    {
+        sendEvent(null, TreeCacheEvent.Type.INITIALIZED);
+    }
+
+    private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+    {
+        TreeCacheEvent event = new TreeCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..87ecb6e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A managed persistent watcher. The watch will be managed such that it stays set through
+ * connection lapses, etc.
+ */
+public class PersistentWatcher implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+    private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard();
+    private final ConnectionStateListener connectionStateListener = (client, newState) -> {
+        if ( newState.isConnected() )
+        {
+            reset();
+        }
+    };
+    private final Watcher watcher = event -> listeners.forEach(w -> w.process(event));
+    private final CuratorFramework client;
+    private final String basePath;
+    private final boolean recursive;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    /**
+     * @param client client
+     * @param basePath path to set the watch on
+     * @param recursive ZooKeeper persistent watches can optionally be recursive
+     */
+    public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+        this.recursive = recursive;
+    }
+
+    /**
+     * Start watching
+     */
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    /**
+     * Remove the watcher
+     */
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            listeners.clear();
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+            try
+            {
+                client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.debug(String.format("Could not remove watcher for path: %s", basePath), e);
+            }
+        }
+    }
+
+    /**
+     * Container for setting listeners
+     *
+     * @return listener container
+     */
+    public Listenable<Watcher> getListenable()
+    {
+        return listeners;
+    }
+
+    /**
+     * Listeners are called when the persistent watcher has been successfully registered
+     * or re-registered after a connection disruption
+     *
+     * @return listener container
+     */
+    public StandardListenerManager<Runnable> getResetListenable()
+    {
+        return resetListeners;
+    }
+
+    private void reset()
+    {
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+                {
+                    reset();
+                }
+                else
+                {
+                    resetListeners.forEach(Runnable::run);
+                }
+            };
+            client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
+        }
+        catch ( Exception e )
+        {
+            log.error("Could not reset persistent watch at path: " + basePath, e);
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..8560f87
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCache extends CuratorTestBase
+{
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
+    {
+        CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            try (CuratorCache cache = CuratorCache.builder(client, "/test").withStorage(storage).build())
+            {
+                cache.listenable().addListener(builder().forChanges((__, ___) -> updatedLatch.countDown()).build());
+                cache.listenable().addListener(builder().forCreates(__ -> addedLatch.countDown()).build());
+                cache.start();
+
+                client.create().forPath("/test/foo", "first".getBytes());
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+                client.setData().forPath("/test/foo", "something new".getBytes());
+                Assert.assertTrue(timing.awaitLatch(updatedLatch));
+            }
+        }
+    }
+
+    @Test
+    public void testAfterInitialized() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            client.create().creatingParentsIfNeeded().forPath("/test/one");
+            client.create().creatingParentsIfNeeded().forPath("/test/one/two");
+            client.create().creatingParentsIfNeeded().forPath("/test/one/two/three");
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                CountDownLatch initializedLatch = new CountDownLatch(1);
+                AtomicInteger eventCount = new AtomicInteger(0);
+                CuratorCacheListener listener = new CuratorCacheListener()
+                {
+                    @Override
+                    public void event(Type type, ChildData oldData, ChildData data)
+                    {
+                        eventCount.incrementAndGet();
+                    }
+
+                    @Override
+                    public void initialized()
+                    {
+                        initializedLatch.countDown();
+                    }
+                };
+                cache.listenable().addListener(builder().forAll(listener).afterInitialized().build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(initializedLatch));
+
+                Assert.assertEquals(initializedLatch.getCount(), 0);
+                Assert.assertEquals(cache.storage().size(), 4);
+                Assert.assertTrue(cache.storage().get("/test").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one/two").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one/two/three").isPresent());
+            }
+        }
+    }
+
+    @Test
+    public void testListenerBuilder() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                Semaphore all = new Semaphore(0);
+                Semaphore deletes = new Semaphore(0);
+                Semaphore changes = new Semaphore(0);
+                Semaphore creates = new Semaphore(0);
+                Semaphore createsAndChanges = new Semaphore(0);
+
+                CuratorCacheListener listener = builder().forAll((__, ___, ____) -> all.release()).forDeletes(__ -> deletes.release()).forChanges((__, ___) -> changes.release()).forCreates(__ -> creates.release()).forCreatesAndChanges((__, ___) -> createsAndChanges.release()).build();
+                cache.listenable().addListener(listener);
+                cache.start();
+
+                client.create().forPath("/test");
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(creates, 1));
+                Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+                Assert.assertEquals(changes.availablePermits(), 0);
+                Assert.assertEquals(deletes.availablePermits(), 0);
+
+                client.setData().forPath("/test", "new".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(changes, 1));
+                Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+                Assert.assertEquals(creates.availablePermits(), 0);
+                Assert.assertEquals(deletes.availablePermits(), 0);
+
+                client.delete().forPath("/test");
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(deletes, 1));
+                Assert.assertEquals(creates.availablePermits(), 0);
+                Assert.assertEquals(changes.availablePermits(), 0);
+                Assert.assertEquals(createsAndChanges.availablePermits(), 0);
+            }
+        }
+    }
+
+    @Test
+    public void testOverrideExecutor() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            CountDownLatch latch = new CountDownLatch(2);
+            Executor executor = proc -> {
+                latch.countDown();
+                proc.run();
+            };
+            try ( CuratorCache cache = CuratorCache.builder(client, "/test").withExecutor(executor).build() )
+            {
+                cache.listenable().addListener((type, oldData, data) -> latch.countDown());
+                cache.start();
+
+                client.create().forPath("/test");
+
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+        }
+    }
+
+    @Test
+    public void testClearOnClose() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            CuratorCacheStorage storage;
+            client.start();
+
+            try ( CuratorCache cache = CuratorCache.builder(client, "/test").withOptions(DO_NOT_CLEAR_ON_CLOSE).build() )
+            {
+                cache.start();
+                storage = cache.storage();
+
+                client.create().forPath("/test", "foo".getBytes());
+                client.create().forPath("/test/bar", "bar".getBytes());
+                timing.sleepABit();
+            }
+            Assert.assertEquals(storage.size(), 2);
+
+            try ( CuratorCache cache = CuratorCache.build(client, "/test") )
+            {
+                cache.start();
+                storage = cache.storage();
+
+                timing.sleepABit();
+            }
+            Assert.assertEquals(storage.size(), 0);
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..ab9dbe4
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+/**
+ * Randomly create nodes in a tree while a set of CuratorCaches listens. Afterwards, validate
+ * that the caches contain the same values as ZK itself
+ */
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheConsistency extends CuratorTestBase
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    private static final Duration testLength = Duration.ofSeconds(30);
+    private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3);
+    private static final Duration sleepLength = Duration.ofMillis(5);
+    private static final int nodesPerLevel = 10;
+    private static final int clusterSize = 5;
+    private static final int maxServerKills = 2;
+
+    private static final String BASE_PATH = "/test";
+
+    private class Client implements Closeable
+    {
+        private final CuratorFramework client;
+        private final CuratorCache cache;
+        private final int index;
+        private final Map<String, ChildData> listenerDataMap = new HashMap<>();
+
+        Client(int index, String connectionString, AtomicReference<Exception> errorSignal)
+        {
+            this.index = index;
+            client = buildClient(connectionString);
+            cache = CuratorCache.builder(client, BASE_PATH).withOptions(DO_NOT_CLEAR_ON_CLOSE).withExceptionHandler(errorSignal::set).build();
+
+            // listenerDataMap is a local data map that will hold values sent by listeners
+            // this way, the listener code can be tested for validity and consistency
+            CuratorCacheListener listener = builder().forCreates(node -> {
+                ChildData previous = listenerDataMap.put(node.getPath(), node);
+                if ( previous != null )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Create for existing node: %s", index, node.getPath())));
+                }
+            }).forChanges((oldNode, node) -> {
+                ChildData previous = listenerDataMap.put(node.getPath(), node);
+                if ( (previous == null) || !Arrays.equals(previous.getData(), oldNode.getData()) )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Bad old value for change node: %s", index, node.getPath())));
+                }
+            }).forDeletes(node -> {
+                ChildData previous = listenerDataMap.remove(node.getPath());
+                if ( previous == null )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Delete for non-existent node: %s", index, node.getPath())));
+                }
+            }).build();
+            cache.listenable().addListener(listener);
+        }
+
+        void start()
+        {
+            client.start();
+            cache.start();
+        }
+
+        @Override
+        public void close()
+        {
+            cache.close();
+            client.close();
+        }
+    }
+
+    @Test
+    public void testConsistencyAfterSimulation() throws Exception
+    {
+        int clientQty = random.nextInt(10, 20);
+        int maxDepth = random.nextInt(5, 10);
+
+        log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+        List<Client> clients = Collections.emptyList();
+        Map<String, String> actualTree;
+
+        AtomicReference<Exception> errorSignal = new AtomicReference<>();
+        try (TestingCluster cluster = new TestingCluster(clusterSize))
+        {
+            cluster.start();
+
+            initializeBasePath(cluster);
+            try
+            {
+                clients = buildClients(cluster, clientQty, errorSignal);
+                workLoop(cluster, clients, maxDepth, errorSignal);
+
+                log.info("Test complete - sleeping to allow events to complete");
+                timing.sleepABit();
+            }
+            finally
+            {
+                clients.forEach(Client::close);
+            }
+
+            actualTree = buildActual(cluster);
+        }
+
+        log.info("client qty: {}", clientQty);
+
+        Map<Integer, List<String>> errorsList = clients.stream()
+            .map(client ->  findErrors(client, actualTree))
+            .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        if ( !errorsList.isEmpty() )
+        {
+            log.error("{} clients had errors", errorsList.size());
+            errorsList.forEach((index, errorList) -> {
+                log.error("Client {}", index);
+                errorList.forEach(log::error);
+                log.error("");
+            });
+
+            Assert.fail("Errors found");
+        }
+    }
+
+    // build a data map recursively from the actual values in ZK
+    private Map<String, String> buildActual(TestingCluster cluster)
+    {
+        Map<String, String> actual = new HashMap<>();
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            buildActual(client, actual, BASE_PATH);
+        }
+        return actual;
+    }
+
+    private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath)
+    {
+        try
+        {
+            byte[] bytes = client.getData().forPath(fromPath);
+            actual.put(fromPath, new String(bytes));
+            client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("", e);
+        }
+    }
+
+    private List<Client> buildClients(TestingCluster cluster, int clientQty, AtomicReference<Exception> errorSignal)
+    {
+        return IntStream.range(0, clientQty)
+            .mapToObj(index -> new Client(index, cluster.getConnectString(), errorSignal))
+            .peek(Client::start)
+            .collect(Collectors.toList());
+    }
+
+    private void initializeBasePath(TestingCluster cluster) throws Exception
+    {
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            client.create().forPath(BASE_PATH, "".getBytes());
+        }
+    }
+
+    private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth, AtomicReference<Exception> errorSignal) throws Exception
+    {
+        Instant start = Instant.now();
+        Instant lastServerKill = Instant.now();
+        int serverKillIndex = 0;
+        while ( true )
+        {
+            Duration elapsed = Duration.between(start, Instant.now());
+            if ( elapsed.compareTo(testLength) >= 0 )
+            {
+                break;
+            }
+
+            Exception errorSignalException = errorSignal.get();
+            if ( errorSignalException != null )
+            {
+                Assert.fail("A client's error handler was called", errorSignalException);
+            }
+
+            Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now());
+            if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+            {
+                lastServerKill = Instant.now();
+                if ( serverKillIndex < maxServerKills )
+                {
+                    doKillServer(cluster, serverKillIndex++);
+                }
+            }
+
+            int thisDepth = random.nextInt(0, maxDepth);
+            String thisPath = randomPath(thisDepth);
+            CuratorFramework client = randomClient(clients);
+            if ( random.nextBoolean() )
+            {
+                doDelete(client, thisPath);
+            }
+            else
+            {
+                doChange(client, thisPath);
+            }
+
+            Thread.sleep(sleepLength.toMillis());
+        }
+    }
+
+    private void doChange(CuratorFramework client, String thisPath)
+    {
+        try
+        {
+            String thisData = Long.toString(random.nextLong());
+            client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes());
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not create/set: " + thisPath);
+        }
+    }
+
+    private void doDelete(CuratorFramework client, String thisPath)
+    {
+        if ( thisPath.equals(BASE_PATH) )
+        {
+            return;
+        }
+        try
+        {
+            client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not delete: " + thisPath);
+        }
+    }
+
+    private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception
+    {
+        log.info("Killing server {}", serverKillIndex);
+        InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+        cluster.killServer(killSpec);
+    }
+
+    private CuratorFramework randomClient(List<Client> clients)
+    {
+        return clients.get(random.nextInt(clients.size())).client;
+    }
+
+    private Map.Entry<Integer, List<String>> findErrors(Client client, Map<String, String> tree)
+    {
+        CuratorCacheStorage storage = client.cache.storage();
+        List<String> errors = new ArrayList<>();
+        if ( tree.size() != storage.size() )
+        {
+            errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size()));
+        }
+        tree.keySet().forEach(path -> {
+            if ( !storage.get(path).isPresent() )
+            {
+                errors.add(String.format("Path %s in master but not client", path));
+            }
+        });
+        storage.stream().forEach(data -> {
+            String treeValue = tree.get(data.getPath());
+            if ( treeValue != null )
+            {
+                if ( !treeValue.equals(new String(data.getData())) )
+                {
+                    errors.add(String.format("Data at %s is not the same", data.getPath()));
+                }
+
+                ChildData listenersMapData = client.listenerDataMap.get(data.getPath());
+                if ( listenersMapData == null )
+                {
+                    errors.add(String.format("listenersMap missing data at: %s", data.getPath()));
+                }
+                else if ( !treeValue.equals(new String(listenersMapData.getData())) )
+                {
+                    errors.add(String.format("Data at %s in listenersMap is not the same", data.getPath()));
+                }
+            }
+            else
+            {
+                errors.add(String.format("Path %s in client but not master", data.getPath()));
+            }
+        });
+
+        client.listenerDataMap.keySet().forEach(path -> {
+            if ( !storage.get(path).isPresent() )
+            {
+                errors.add(String.format("Path %s in listenersMap but not storage", path));
+            }
+        });
+
+        return new AbstractMap.SimpleEntry<>(client.index, errors);
+    }
+
+    private String randomPath(int depth)
+    {
+        StringBuilder str = new StringBuilder(BASE_PATH);
+        while ( depth-- > 0 )
+        {
+            int levelNodeName = random.nextInt(nodesPerLevel);
+            str.append("/").append(levelNodeName);
+        }
+        return str.toString();
+    }
+
+    @Override
+    protected void createServer()
+    {
+        // do nothing - we'll be using TestingCluster instead
+    }
+
+    private CuratorFramework buildClient(String connectionString)
+    {
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100);
+        return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy);
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
new file mode 100644
index 0000000..8baf2e2
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEventOrdering extends TestEventOrdering<CuratorCache>
+{
+    @Override
+    protected int getActualQty(CuratorCache cache)
+    {
+        return cache.storage().size();
+    }
+
+    @Override
+    protected CuratorCache newCache(CuratorFramework client, String path, BlockingQueue<Event> events)
+    {
+        CuratorCache cache = CuratorCache.build(client, path);
+        cache.listenable().addListener((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CREATED )
+            {
+                events.add(new Event(EventType.ADDED, node.getPath()));
+            }
+            else if ( type == CuratorCacheListener.Type.NODE_DELETED )
+            {
+                events.add(new Event(EventType.DELETED, oldNode.getPath()));
+            }
+        });
+        cache.start();
+        return cache;
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
new file mode 100644
index 0000000..4a75acf
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.SINGLE_NODE_CACHE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheStorage.toMap;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheWrappers extends CuratorTestBase
+{
+    @Test
+    public void testPathChildrenCache() throws Exception    // copied from TestPathChildrenCache#testBasics()
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<>();
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                PathChildrenCacheListener listener = (__, event) -> {
+                    if ( event.getData().getPath().equals("/test/one") )
+                    {
+                        events.offer(event.getType());
+                    }
+                };
+                cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
+                cache.start();
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            }
+        }
+    }
+
+    @Test
+    public void testTreeCache() throws Exception    // copied from TestTreeCache#testBasics()
+    {
+        BaseTestTreeCache treeCacheBase = new BaseTestTreeCache();
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                cache.listenable().addListener(builder().forTreeCache(client, treeCacheBase.eventListener).build());
+                cache.start();
+
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.INITIALIZED);
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/t").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/testing").count(), 0);
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "hey there");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test/one")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/o").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/onely").count(), 0);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+            }
+        }
+    }
+
+    @Test
+    public void testNodeCache() throws Exception    // copied from TestNodeCache#testBasics()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test/node", SINGLE_NODE_CACHE))
+            {
+                Supplier<ChildData> getRootData = () -> cache.storage().get("/test/node").orElseThrow(() -> new AssertionError("is not present"));
+                cache.start();
+
+                final Semaphore semaphore = new Semaphore(0);
+                cache.listenable().addListener(builder().forNodeCache(semaphore::release).build());
+                try
+                {
+                    getRootData.get();
+                    Assert.fail("Should have thrown");
+                }
+                catch ( AssertionError expected )
+                {
+                    // expected
+                }
+
+                client.create().forPath("/test/node", "a".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(getRootData.get().getData(), "a".getBytes());
+
+                client.setData().forPath("/test/node", "b".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(getRootData.get().getData(), "b".getBytes());
+
+                client.delete().forPath("/test/node");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                try
+                {
+                    getRootData.get();
+                    Assert.fail("Should have thrown");
+                }
+                catch ( AssertionError expected )
+                {
+                    // expected
+                }
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..3e81b63
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestWrappedNodeCache extends CuratorTestBase
+{
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes());
+
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCache.build(client, "/test/foo");
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/foo");
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "one".getBytes());
+
+            client.delete().forPath("/test/foo");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            client.create().forPath("/test/foo", "two".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "two".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes());
+
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            });
+
+            cache = CuratorCache.build(client,"/test/node");
+
+            Semaphore latch = new Semaphore(0);
+            NodeCacheListener listener = latch::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "start".getBytes());
+
+            client.setData().forPath("/test/node", "new data".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "new data".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCache.build(client, "/test/node");
+            cache.start();
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+            final Semaphore semaphore = new Semaphore(0);
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Assert.assertNull(rootData.get().orElse(null));
+
+            client.create().forPath("/test/node", "a".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(rootData.get().orElse(null).getData(), "a".getBytes());
+
+            client.setData().forPath("/test/node", "b".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(rootData.get().orElse(null).getData(), "b".getBytes());
+
+            client.delete().forPath("/test/node");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertNull(rootData.get().orElse(null));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    private Supplier<Optional<ChildData>> getRootDataProc(CuratorCache cache, String rootPath)
+    {
+        return () -> cache.storage().get(rootPath);
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
new file mode 100644
index 0000000..534c365
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPersistentWatcher extends CuratorTestBase
+{
+    @Test
+    public void testConnectionLostRecursive() throws Exception
+    {
+        internalTest(true);
+    }
+
+    @Test
+    public void testConnectionLost() throws Exception
+    {
+        internalTest(false);
+    }
+
+    private void internalTest(boolean recursive) throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            client.start();
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+                else if ( newState == ConnectionState.RECONNECTED )
+                {
+                    reconnectedLatch.countDown();
+                }
+            });
+
+            try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) )
+            {
+                persistentWatcher.start();
+
+                BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+                persistentWatcher.getListenable().addListener(events::add);
+
+                client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+                if ( recursive )
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+                }
+                else
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");   // child added
+                }
+
+                server.stop();
+                Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected);
+                Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+                server.restart();
+                Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+
+                timing.sleepABit();     // time to allow watcher to get reset
+                events.clear();
+
+                if ( recursive )
+                {
+                    client.setData().forPath("/top/main/a", "foo".getBytes());
+                    Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+                }
+                client.setData().forPath("/top/main", "bar".getBytes());
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            }
+        }
+    }
+}
\ No newline at end of file