You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/10 14:22:49 UTC

[curator] branch CURATOR-549-zk36-persistent-watcher-recipes updated (67072a7 -> 9eb0f6f)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git.


    omit 67072a7  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
    omit 260dedd  CURATOR-549
    omit 40478c5  CURATOR-549
    omit a8ba00d  CURATOR-549
     add 6ca75d3  Bump jackson-version from 2.9.8 to 2.10.0
     add 0d11fee  Merge pull request #332 from apache/dependabot/maven/jackson-version-2.10.0
     add 9129a76  CURATOR-547 change JAX-RS reader/writer to reuse Jackson ObjectMapper; also Jackson 2.9.8 -> 2.9.10
     add 05b23a6  CURATOR-549
     add 4eab363  CURATOR-549
     new ca5e3b2  CURATOR-549
     new b6ed705  CURATOR-549
     new 9eb0f6f  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (67072a7)
            \
             N -- N -- N   refs/heads/CURATOR-549-zk36-persistent-watcher-recipes (9eb0f6f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../curator/framework/imps/TestWatchesBuilder.java | 431 +++++++++++----------
 .../recipes/leader/TestLeaderSelectorEdges.java    |   2 +
 .../recipes/watch/TestPersistentWatcher.java       |   1 +
 .../entity/JsonServiceInstanceMarshaller.java      |   8 +-
 pom.xml                                            |   2 +-
 5 files changed, 230 insertions(+), 214 deletions(-)


[curator] 01/03: CURATOR-549

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit ca5e3b2b953dc0c0fd111317b7c206449da3ac69
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500

    CURATOR-549
    
    Bring Curator up to ZooKeeper 3.5.6 in preparation for supporting persistent recursive watchers while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain compatibility with ZK 3.5.x. ZooKeeper 3.6.0 has some significant changes from previous versions. The reconfig APIs have moved into a new class, ZooKeeperAdmin. This class existed in 3.5.x but wasn't required. Now it is. A bunch of little things changed in the ZK server code  [...]
    
    There is a new module, curator-test-zk35. It forces ZooKeeper 3.5.6 and performs selected tests from the other modules to ensure compatibility. Tests annotated with TestNG groups zk35 and zk35Compatibility are tested. Group zk36 is excluded. Note: these tests will only run from Maven. I don't think IntelliJ/Eclipse support the Maven syntax I used.
    Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x
---
 .../curator/framework/imps/TestWatchesBuilder.java | 431 +++++++++++----------
 1 file changed, 222 insertions(+), 209 deletions(-)

diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
index 26c41f1..b65e85c 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -55,7 +56,7 @@ public class TestWatchesBuilder extends CuratorTestBase
         final AtomicReference<ConnectionState> state = new AtomicReference<ConnectionState>();
         client.getConnectionStateListenable().addListener(new ConnectionStateListener()
         {
-            
+
             @Override
             public void stateChanged(CuratorFramework client, ConnectionState newState)
             {
@@ -66,13 +67,13 @@ public class TestWatchesBuilder extends CuratorTestBase
                 }
             }
         });
-        
+
         return state;
     }
-    
+
     private boolean blockUntilDesiredConnectionState(AtomicReference<ConnectionState> stateRef, Timing timing, final ConnectionState desiredState)
     {
-        if(stateRef.get() == desiredState)
+        if ( stateRef.get() == desiredState )
         {
             return true;
         }
@@ -80,55 +81,55 @@ public class TestWatchesBuilder extends CuratorTestBase
         //noinspection SynchronizationOnLocalVariableOrMethodParameter
         synchronized(stateRef)
         {
-            if(stateRef.get() == desiredState)
+            if ( stateRef.get() == desiredState )
             {
                 return true;
             }
-            
+
             try
             {
                 stateRef.wait(timing.milliseconds());
                 return stateRef.get() == desiredState;
             }
-            catch(InterruptedException e)
+            catch ( InterruptedException e )
             {
                 Thread.currentThread().interrupt();
                 return false;
             }
         }
     }
-    
+
     @Test
     public void testRemoveCuratorDefaultWatcher() throws Exception
     {
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            
-            final String path = "/";            
+
+            final String path = "/";
             client.getCuratorListenable().addListener(new CuratorListener()
-            {                
+            {
                 @Override
-                public void eventReceived(CuratorFramework client, CuratorEvent event)
-                        throws Exception
+                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
                 {
-                    if(event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved) {                        
+                    if ( event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved )
+                    {
                         removedLatch.countDown();
-                    }        
+                    }
                 }
             });
-                        
+
             client.checkExists().watched().forPath(path);
-            
+
             client.watches().removeAll().forPath(path);
-            
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
         }
         finally
@@ -136,34 +137,35 @@ public class TestWatchesBuilder extends CuratorTestBase
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
     public void testRemoveCuratorWatch() throws Exception
-    {       
+    {
         Timing timing = new Timing();
         CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            
-            final String path = "/";            
+
+            final String path = "/";
             CuratorWatcher watcher = new CuratorWatcher()
             {
-                
+
                 @Override
                 public void process(WatchedEvent event) throws Exception
                 {
-                    if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) {
+                    if ( event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved )
+                    {
                         removedLatch.countDown();
                     }
                 }
             };
-                        
+
             client.checkExists().usingWatcher(watcher).forPath(path);
 
             client.watches().remove(watcher).forPath(path);
@@ -174,25 +176,25 @@ public class TestWatchesBuilder extends CuratorTestBase
         {
             CloseableUtils.closeQuietly(client);
         }
-    }    
-    
+    }
+
     @Test
     public void testRemoveWatch() throws Exception
-    {       
+    {
         Timing timing = new Timing();
         CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            
-            final String path = "/";    
+
+            final String path = "/";
             Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-            
+
             client.checkExists().usingWatcher(watcher).forPath(path);
 
             client.watches().remove(watcher).forPath(path);
@@ -204,97 +206,97 @@ public class TestWatchesBuilder extends CuratorTestBase
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
     public void testRemoveWatchInBackgroundWithCallback() throws Exception
-    {       
+    {
         Timing timing = new Timing();
         CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
-        {            
+        {
             client.start();
-         
+
             //Make sure that the event fires on both the watcher and the callback.
             final CountDownLatch removedLatch = new CountDownLatch(2);
             final String path = "/";
             Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-            
+
             BackgroundCallback callback = new BackgroundCallback()
             {
-                
+
                 @Override
-                public void processResult(CuratorFramework client, CuratorEvent event)
-                        throws Exception
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                 {
-                    if(event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path)) {
+                    if ( event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path) )
+                    {
                         removedLatch.countDown();
                     }
                 }
             };
-            
+
             client.checkExists().usingWatcher(watcher).forPath(path);
 
             client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path);
 
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
-            
+
         }
         finally
         {
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
     public void testRemoveWatchInBackgroundWithNoCallback() throws Exception
-    {       
+    {
         Timing timing = new Timing();
         CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final String path = "/";
             final CountDownLatch removedLatch = new CountDownLatch(1);
             Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
-            
+
             client.checkExists().usingWatcher(watcher).forPath(path);
 
             client.watches().remove(watcher).inBackground().forPath(path);
 
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
-            
+
         }
         finally
         {
             CloseableUtils.closeQuietly(client);
         }
-    }        
-    
+    }
+
     @Test
     public void testRemoveAllWatches() throws Exception
-    {       
+    {
         Timing timing = new Timing();
         CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final String path = "/";
             final CountDownLatch removedLatch = new CountDownLatch(2);
-            
-            Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);            
-            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);                        
-            
+
+            Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
+            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
             client.getChildren().usingWatcher(watcher1).forPath(path);
             client.checkExists().usingWatcher(watcher2).forPath(path);
 
@@ -306,32 +308,32 @@ public class TestWatchesBuilder extends CuratorTestBase
         {
             CloseableUtils.closeQuietly(client);
         }
-    }  
-    
+    }
+
     @Test
     public void testRemoveAllDataWatches() throws Exception
-    {       
+    {
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final String path = "/";
             final AtomicBoolean removedFlag = new AtomicBoolean(false);
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            
-            Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved);            
-            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);                        
-            
+
+            Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved);
+            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
             client.getChildren().usingWatcher(watcher1).forPath(path);
             client.checkExists().usingWatcher(watcher2).forPath(path);
-            
+
             client.watches().removeAll().ofType(WatcherType.Data).forPath(path);
-            
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
             Assert.assertEquals(removedFlag.get(), false);
         }
@@ -340,31 +342,31 @@ public class TestWatchesBuilder extends CuratorTestBase
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
     public void testRemoveAllChildWatches() throws Exception
-    {       
+    {
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final String path = "/";
             final AtomicBoolean removedFlag = new AtomicBoolean(false);
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            
-            Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved);            
-            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);                        
-                        
+
+            Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved);
+            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
+
             client.checkExists().usingWatcher(watcher1).forPath(path);
             client.getChildren().usingWatcher(watcher2).forPath(path);
-            
+
             client.watches().removeAll().ofType(WatcherType.Children).forPath(path);
-            
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
             Assert.assertEquals(removedFlag.get(), false);
         }
@@ -372,34 +374,35 @@ public class TestWatchesBuilder extends CuratorTestBase
         {
             CloseableUtils.closeQuietly(client);
         }
-    }     
-    
+    }
+
     @Test
-    public void testRemoveLocalWatch() throws Exception {
+    public void testRemoveLocalWatch() throws Exception
+    {
         Timing timing = new Timing();
         CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-            
+
             final String path = "/";
-            
+
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            
-            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);        
-            
+
+            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
             client.checkExists().usingWatcher(watcher).forPath(path);
 
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
-            
+
             Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-                       
+
             client.watches().removeAll().locally().forPath(path);
 
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
@@ -409,33 +412,34 @@ public class TestWatchesBuilder extends CuratorTestBase
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
-    public void testRemoveLocalWatchInBackground() throws Exception {
+    public void testRemoveLocalWatchInBackground() throws Exception
+    {
         Timing timing = new Timing();
         CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-            
+
             final String path = "/";
-            
+
             final CountDownLatch removedLatch = new CountDownLatch(1);
-            
-            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);        
-            
+
+            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
             client.checkExists().usingWatcher(watcher).forPath(path);
 
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
-            
+
             Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-                       
+
             client.watches().removeAll().locally().inBackground().forPath(path);
 
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
@@ -444,8 +448,8 @@ public class TestWatchesBuilder extends CuratorTestBase
         {
             CloseableUtils.closeQuietly(client);
         }
-    }    
-    
+    }
+
     /**
      * Test the case where we try and remove an unregistered watcher. In this case we expect a NoWatcherException to
      * be thrown. 
@@ -455,21 +459,22 @@ public class TestWatchesBuilder extends CuratorTestBase
     public void testRemoveUnregisteredWatcher() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
-            final String path = "/";            
-            Watcher watcher = new Watcher() {
+
+            final String path = "/";
+            Watcher watcher = new Watcher()
+            {
                 @Override
                 public void process(WatchedEvent event)
                 {
-                }                
+                }
             };
-            
+
             try
             {
                 client.watches().remove(watcher).forPath(path);
@@ -485,7 +490,7 @@ public class TestWatchesBuilder extends CuratorTestBase
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     /**
      * Test the case where we try and remove an unregistered watcher but have the quietly flag set. In this case we expect success. 
      * @throws Exception
@@ -495,22 +500,22 @@ public class TestWatchesBuilder extends CuratorTestBase
     {
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             final AtomicBoolean watcherRemoved = new AtomicBoolean(false);
-            
-            final String path = "/";            
+
+            final String path = "/";
             Watcher watcher = new BooleanWatcher(path, watcherRemoved, EventType.DataWatchRemoved);
-            
+
             client.watches().remove(watcher).quietly().forPath(path);
-            
+
             timing.sleepABit();
-            
+
             //There should be no watcher removed as none were registered.
             Assert.assertEquals(watcherRemoved.get(), false);
         }
@@ -519,94 +524,94 @@ public class TestWatchesBuilder extends CuratorTestBase
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
-    public void testGuaranteedRemoveWatch() throws Exception {
+    public void testGuaranteedRemoveWatch() throws Exception
+    {
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.builder().
-                connectString(server.getConnectString()).
-                retryPolicy(new RetryOneTime(1)).
-                build();
+            connectString(server.getConnectString()).
+            retryPolicy(new RetryOneTime(1)).
+            build();
         try
         {
             client.start();
-            
+
             AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-                       
+
             String path = "/";
-            
+
             CountDownLatch removeLatch = new CountDownLatch(1);
-            
-            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);            
+
+            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
             client.checkExists().usingWatcher(watcher).forPath(path);
-            
-            server.stop();           
-            
+
+            server.stop();
+
             Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-            
+
             //Remove the watch while we're not connected
-            try 
+            try
             {
                 client.watches().remove(watcher).guaranteed().forPath(path);
                 Assert.fail();
             }
-            catch(KeeperException.ConnectionLossException e)
+            catch ( KeeperException.ConnectionLossException e )
             {
                 //Expected
             }
-            
+
             server.restart();
-            
-            timing.awaitLatch(removeLatch);            
+
+            timing.awaitLatch(removeLatch);
         }
         finally
         {
             CloseableUtils.closeQuietly(client);
         }
     }
-    
+
     @Test
-    public void testGuaranteedRemoveWatchInBackground() throws Exception {
+    public void testGuaranteedRemoveWatchInBackground() throws Exception
+    {
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(),
-                                                                    new ExponentialBackoffRetry(100, 3));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
         try
         {
             client.start();
-            
+
             AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
-                        
+
             final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
-            
+
             ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
             {
 
                 @Override
-                public void pathAddedForGuaranteedOperation(
-                        FailedRemoveWatchDetails detail)
+                public void pathAddedForGuaranteedOperation(FailedRemoveWatchDetails detail)
                 {
                     guaranteeAddedLatch.countDown();
                 }
             };
-            
+
             String path = "/";
-            
+
             CountDownLatch removeLatch = new CountDownLatch(1);
-            
-            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);            
+
+            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
             client.checkExists().usingWatcher(watcher).forPath(path);
-            
-            server.stop();           
+
+            server.stop();
             Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
-            
+
             //Remove the watch while we're not connected
             client.watches().remove(watcher).guaranteed().inBackground().forPath(path);
-            
+
             timing.awaitLatch(guaranteeAddedLatch);
-            
+
             server.restart();
-            
-            timing.awaitLatch(removeLatch);            
+
+            timing.awaitLatch(removeLatch);
         }
         finally
         {
@@ -617,7 +622,7 @@ public class TestWatchesBuilder extends CuratorTestBase
     @Test(groups = CuratorTestBase.zk36Group)
     public void testPersistentRecursiveWatch() throws Exception
     {
-        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)))
         {
             client.start();
             client.blockUntilConnected();
@@ -647,7 +652,7 @@ public class TestWatchesBuilder extends CuratorTestBase
             };
             return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
         };
-        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build())
         {
             client.start();
             client.blockUntilConnected();
@@ -664,51 +669,59 @@ public class TestWatchesBuilder extends CuratorTestBase
         }
     }
 
-    private static class CountDownWatcher implements Watcher {
+    private static class CountDownWatcher implements Watcher
+    {
         private String path;
         private EventType eventType;
         private CountDownLatch removeLatch;
-        
-        public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType) {
+
+        public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType)
+        {
             this.path = path;
             this.eventType = eventType;
-            this.removeLatch = removeLatch;            
+            this.removeLatch = removeLatch;
         }
-        
+
         @Override
         public void process(WatchedEvent event)
         {
-            if(event.getPath() == null || event.getType() == null) {
+            if ( event.getPath() == null || event.getType() == null )
+            {
                 return;
             }
-            
-            if(event.getPath().equals(path) && event.getType() == eventType) {
+
+            if ( event.getPath().equals(path) && event.getType() == eventType )
+            {
                 removeLatch.countDown();
             }
-        }  
+        }
     }
-    
-    private static class BooleanWatcher implements Watcher {
+
+    private static class BooleanWatcher implements Watcher
+    {
         private String path;
         private EventType eventType;
         private AtomicBoolean removedFlag;
-        
-        public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType) {
+
+        public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType)
+        {
             this.path = path;
             this.eventType = eventType;
-            this.removedFlag = removedFlag;            
+            this.removedFlag = removedFlag;
         }
-        
+
         @Override
         public void process(WatchedEvent event)
         {
-            if(event.getPath() == null || event.getType() == null) {
+            if ( event.getPath() == null || event.getType() == null )
+            {
                 return;
             }
-            
-            if(event.getPath().equals(path) && event.getType() == eventType) {
+
+            if ( event.getPath().equals(path) && event.getType() == eventType )
+            {
                 removedFlag.set(true);
             }
-        }  
-    }    
+        }
+    }
 }


[curator] 03/03: CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 9eb0f6fe773189747620eb87f55d2709be4d82fc
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Nov 8 10:53:38 2019 -0500

    CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
---
 .../framework/recipes/cache/CuratorCacheImpl.java  |  17 ++-
 .../framework/recipes/watch/PersistentWatcher.java |  10 +-
 .../src/site/confluence/index.confluence           |   3 +-
 .../site/confluence/persistent-watcher.confluence  |  35 +++++
 .../framework/recipes/cache/TestCuratorCache.java  |  45 ------
 .../recipes/cache/TestCuratorCacheEdges.java       | 153 +++++++++++++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |   1 +
 7 files changed, 208 insertions(+), 56 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index ee95570..e6be71c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collections;
@@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void nodeChildrenChanged(String fromPath)
+    private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat)
     {
         if ( (state.get() != State.STARTED) || !recursive )
         {
             return;
         }
 
+        if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) )
+        {
+            return; // children haven't changed
+        }
+
         try
         {
             BackgroundCallback callback = (__, event) -> {
@@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache
             BackgroundCallback callback = (__, event) -> {
                 if ( event.getResultCode() == OK.intValue() )
                 {
-                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
-                    nodeChildrenChanged(event.getPath());
+                    Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat());
                 }
                 else if ( event.getResultCode() == NONODE.intValue() )
                 {
@@ -233,12 +239,12 @@ class CuratorCacheImpl implements CuratorCache
         }
     }
 
-    private void putStorage(ChildData data)
+    private Optional<ChildData> putStorage(ChildData data)
     {
         Optional<ChildData> previousData = storage.put(data);
         if ( previousData.isPresent() )
         {
-            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            if ( previousData.get().getStat().getVersion() != data.getStat().getVersion() )
             {
                 callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
             }
@@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache
         {
             callListeners(l -> l.event(NODE_CREATED, null, data));
         }
+        return previousData;
     }
 
     private void removeStorage(String path)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 87ecb6e..187343a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -114,7 +114,7 @@ public class PersistentWatcher implements Closeable
             client.getConnectionStateListenable().removeListener(connectionStateListener);
             try
             {
-                client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+                client.watchers().remove(watcher).guaranteed().inBackground().forPath(basePath);
             }
             catch ( Exception e )
             {
@@ -140,7 +140,7 @@ public class PersistentWatcher implements Closeable
      *
      * @return listener container
      */
-    public StandardListenerManager<Runnable> getResetListenable()
+    public Listenable<Runnable> getResetListenable()
     {
         return resetListeners;
     }
@@ -150,13 +150,13 @@ public class PersistentWatcher implements Closeable
         try
         {
             BackgroundCallback callback = (__, event) -> {
-                if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
-                    reset();
+                    resetListeners.forEach(Runnable::run);
                 }
                 else
                 {
-                    resetListeners.forEach(Runnable::run);
+                    reset();
                 }
             };
             client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence
index d96b5ce..ab8dc53 100644
--- a/curator-recipes/src/site/confluence/index.confluence
+++ b/curator-recipes/src/site/confluence/index.confluence
@@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths".
 |[[Node Cache|node-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
 |[[Tree Cache|tree-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
 
-||Nodes||
+||Nodes/Watches||
+|[[Persistent Recursive Watcher|persistent-watcher.html]] \- A managed persistent recursive watcher. The watch will be managed such that it stays set through connection lapses, etc.|
 |[[Persistent Node|persistent-node.html]] \- A node that attempts to stay present in ZooKeeper, even through connection and session interruptions.|
 |[[Persistent TTL Node|persistent-ttl-node.html]] \- Useful when you need to create a TTL node but don't want to keep it alive manually by periodically setting data.|
 |[Group Member|group-member.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.|
diff --git a/curator-recipes/src/site/confluence/persistent-watcher.confluence b/curator-recipes/src/site/confluence/persistent-watcher.confluence
new file mode 100644
index 0000000..4551669
--- /dev/null
+++ b/curator-recipes/src/site/confluence/persistent-watcher.confluence
@@ -0,0 +1,35 @@
+h1. Persistent Recursive Watcher
+
+*Note: * PersistentWatcher requires ZooKeeper 3.6\+.
+
+h2. Description
+A managed persistent persistent watcher. The watch will be managed such that it stays set through connection lapses, etc.
+
+h2. Participating Classes
+* PersistentWatcher
+
+h2. Usage
+h3. Creating a PersistentWatcher
+{code}
+public PersistentWatcher(CuratorFramework client,
+                               String basePath,
+                               boolean recursive)
+
+Parameters:
+client - the client
+basePath - path to set the watch on
+recursive - ZooKeeper persistent watches can optionally be recursive
+{code}
+
+h2. General Usage
+The instance must be started by calling {{start()}}. Call {{close()}} when you want to remove the watch.
+
+PersistentWatcher presents two listener types:
+
+* {{Listenable<Watcher> getListenable()}} \- Use this to add watchers. These will behave in the same manner that watchers added
+via {{ZooKeeper.addWatch()}} behave.
+* {{Listenable<Runnable> getResetListenable()}} \- The Runnables added with this get called once the Persistent Watcher has been successfully set
+(or reset after a connection partition).
+
+h2. Error Handling
+PersistentWatcher instances internally monitor connection losses, etc. automatically resetting on reconnection.
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 8560f87..2d6fb0d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu
 public class TestCuratorCache extends CuratorTestBase
 {
     @Test
-    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
-    {
-        try (TestingCluster cluster = new TestingCluster(3))
-        {
-            cluster.start();
-
-            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
-            {
-                client.start();
-                client.create().creatingParentsIfNeeded().forPath("/test");
-
-                try (CuratorCache cache = CuratorCache.build(client, "/test"))
-                {
-                    cache.start();
-
-                    CountDownLatch reconnectLatch = new CountDownLatch(1);
-                    client.getConnectionStateListenable().addListener((__, newState) -> {
-                        if ( newState == ConnectionState.RECONNECTED )
-                        {
-                            reconnectLatch.countDown();
-                        }
-                    });
-                    CountDownLatch latch = new CountDownLatch(3);
-                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
-
-                    client.create().forPath("/test/one");
-                    client.create().forPath("/test/two");
-                    client.create().forPath("/test/three");
-
-                    Assert.assertTrue(timing.awaitLatch(latch));
-
-                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
-                    cluster.killServer(connectionInstance);
-
-                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
-
-                    timing.sleepABit();
-
-                    Assert.assertEquals(cache.storage().stream().count(), 4);
-                }
-            }
-        }
-    }
-
-    @Test
     public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
     {
         CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
new file mode 100644
index 0000000..f20f775
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEdges extends CuratorTestBase
+{
+    @Test
+    public void testReconnectConsistency() throws Exception
+    {
+        final byte[] first = "one".getBytes();
+        final byte[] second = "two".getBytes();
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/root", first);
+            client.create().forPath("/root/1", first);
+            client.create().forPath("/root/2", first);
+            client.create().forPath("/root/1/11", first);
+            client.create().forPath("/root/1/12", first);
+            client.create().forPath("/root/1/13", first);
+            client.create().forPath("/root/2/21", first);
+            client.create().forPath("/root/2/22", first);
+
+            CuratorCacheStorage storage = CuratorCacheStorage.standard();
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            // we now have a storage loaded with the initial nodes created
+
+            // simulate nodes changing during a partition
+
+            client.delete().forPath("/root/2/21");
+            client.delete().forPath("/root/2/22");
+            client.delete().forPath("/root/2");
+
+            client.setData().forPath("/root", second);
+            client.create().forPath("/root/1/11/111", second);
+            client.create().forPath("/root/1/11/111/1111", second);
+            client.create().forPath("/root/1/11/111/1112", second);
+            client.create().forPath("/root/1/13/131", second);
+            client.create().forPath("/root/1/13/132", second);
+            client.create().forPath("/root/1/13/132/1321", second);
+
+            try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build())
+            {
+                CountDownLatch latch = new CountDownLatch(1);
+                cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+
+            Assert.assertEquals(storage.size(), 11);
+            Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first);
+            Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second);
+            Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second);
+        }
+    }
+
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index 534c365..1cf7eb0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+@Test(groups = CuratorTestBase.zk36Group)
 public class TestPersistentWatcher extends CuratorTestBase
 {
     @Test


[curator] 02/03: CURATOR-549

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit b6ed705f3978d287c92f826317039297a3b43547
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 12:45:21 2019 -0500

    CURATOR-549
    
    Adds several recipes that use the new ZOOKEEPER-1416 Persistent Recursive watches from ZooKeeper 3.6.0. PersistentWatcher - A wrapper recipe that keeps a persistent (single or recursive) watch set and active through disconnections, etc. CuratorCache - Completely re-written cache recipe that will replace TreeCache, NodeCache and PathChildrenCache. With the benefit of persistent recursive watchers, the implementation is far simpler, will use significantly less resources and network call [...]
---
 .../src/main/java/cache/CuratorCacheExample.java   | 102 ++++++
 .../src/site/confluence/index.confluence           |   2 +-
 .../framework/recipes/cache/CuratorCache.java      | 114 +++++++
 .../recipes/cache/CuratorCacheBuilder.java         |  63 ++++
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  74 ++++
 .../framework/recipes/cache/CuratorCacheImpl.java  | 297 ++++++++++++++++
 .../recipes/cache/CuratorCacheListener.java        |  78 +++++
 .../recipes/cache/CuratorCacheListenerBuilder.java | 129 +++++++
 .../cache/CuratorCacheListenerBuilderImpl.java     | 161 +++++++++
 .../recipes/cache/CuratorCacheStorage.java         | 116 +++++++
 .../curator/framework/recipes/cache/NodeCache.java |   3 +
 .../recipes/cache/NodeCacheListenerWrapper.java    |  46 +++
 .../framework/recipes/cache/PathChildrenCache.java |   4 +-
 .../cache/PathChildrenCacheListenerWrapper.java    |  78 +++++
 .../recipes/cache/StandardCuratorCacheStorage.java |  88 +++++
 .../curator/framework/recipes/cache/TreeCache.java |   3 +
 .../recipes/cache/TreeCacheListenerWrapper.java    |  81 +++++
 .../framework/recipes/watch/PersistentWatcher.java | 169 ++++++++++
 .../src/site/confluence/curator-cache.confluence   |  36 ++
 .../src/site/confluence/index.confluence           |   7 +-
 .../framework/recipes/cache/TestCuratorCache.java  | 248 ++++++++++++++
 .../recipes/cache/TestCuratorCacheConsistency.java | 373 +++++++++++++++++++++
 .../cache/TestCuratorCacheEventOrdering.java       |  52 +++
 .../recipes/cache/TestCuratorCacheWrappers.java    | 162 +++++++++
 .../recipes/cache/TestWrappedNodeCache.java        | 172 ++++++++++
 .../recipes/watch/TestPersistentWatcher.java       | 105 ++++++
 src/site/confluence/index.confluence               |   2 +-
 src/site/confluence/zk-compatibility.confluence    |   2 +-
 28 files changed, 2760 insertions(+), 7 deletions(-)

diff --git a/curator-examples/src/main/java/cache/CuratorCacheExample.java b/curator-examples/src/main/java/cache/CuratorCacheExample.java
new file mode 100644
index 0000000..5070efd
--- /dev/null
+++ b/curator-examples/src/main/java/cache/CuratorCacheExample.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Very simple example of creating a CuratorCache that listens to events and logs the changes
+ * to standard out. A loop of random changes is run to exercise the cache.
+ */
+public class CuratorCacheExample
+{
+    private static final String PATH = "/example/cache";
+
+    public static void main(String[] args) throws Exception
+    {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        try (TestingServer server = new TestingServer())
+        {
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)))
+            {
+                client.start();
+                try (CuratorCache cache = CuratorCache.build(client, PATH))
+                {
+                    // there are several ways to set a listener on a CuratorCache. You can watch for individual events
+                    // or for all events. Here, we'll watch all events and merely log that they occurred
+                    CuratorCacheListener listener = (type, oldNode, node) -> {
+                        switch ( type )
+                        {
+                            case NODE_CREATED:
+                                System.out.println(String.format("Node created: [%s]", node));
+                                break;
+
+                            case NODE_CHANGED:
+                                System.out.println(String.format("Node changed. Old: [%s] New: [%s]", oldNode, node));
+                                break;
+
+                            case NODE_DELETED:
+                                System.out.println(String.format("Node deleted. Old value: [%s]", oldNode));
+                                break;
+                        }
+                    };
+
+                    // register the listener
+                    cache.listenable().addListener(listener);
+
+                    // the cache must be started
+                    cache.start();
+
+                    // now randomly create/change/delete nodes
+                    for ( int i = 0; i < 1000; ++i )
+                    {
+                        int depth = random.nextInt(1, 4);
+                        String path = makeRandomPath(random, depth);
+                        if ( random.nextBoolean() )
+                        {
+                            client.create().orSetData().creatingParentsIfNeeded().forPath(path, Long.toString(random.nextLong()).getBytes());
+                        }
+                        else
+                        {
+                            client.delete().quietly().deletingChildrenIfNeeded().forPath(path);
+                        }
+
+                        Thread.sleep(5);
+                    }
+                }
+            }
+        }
+    }
+
+    private static String makeRandomPath(ThreadLocalRandom random, int depth)
+    {
+        if ( depth == 0 )
+        {
+            return PATH;
+        }
+        return makeRandomPath(random, depth - 1) + "/" + random.nextInt(3);
+    }
+}
\ No newline at end of file
diff --git a/curator-examples/src/site/confluence/index.confluence b/curator-examples/src/site/confluence/index.confluence
index f9be506..6c5a30d 100644
--- a/curator-examples/src/site/confluence/index.confluence
+++ b/curator-examples/src/site/confluence/index.confluence
@@ -3,7 +3,7 @@ h1. Examples
 This module contains example usages of various Curator features. Each directory in the module is a separate example.
 
 |/leader|Example leader selector code|
-|/cache|Example PathChildrenCache usage|
+|/cache|Example CuratorCache usage|
 |/locking|Example of using InterProcessMutex|
 |/discovery|Example usage of the Curator's ServiceDiscovery|
 |/framework|A few examples of how to use the CuratorFramework class|
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..ebe06e0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+
+/**
+ * <p>
+ *     A utility that attempts to keep the data from a node locally cached. Optionally the entire
+ *     tree of children below the node can also be cached. Will respond to update/create/delete events, pull
+ *     down the data, etc. You can register listeners that will get notified when changes occur.
+ * </p>
+ *
+ * <p>
+ *     <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+ *     be prepared for false-positives and false-negatives. Additionally, always use the version number
+ *     when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * cache build options
+     */
+    enum Options
+    {
+        /**
+         * Normally the entire tree of nodes starting at the given node are cached. This option
+         * causes only the given node to be cached (i.e. a single node cache)
+         */
+        SINGLE_NODE_CACHE,
+
+        /**
+         * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+         */
+        COMPRESSED_DATA,
+
+        /**
+         * Normally, when the cache is closed via {@link CuratorCache#close()}, the storage is cleared
+         * via {@link CuratorCacheStorage#clear()}. This option prevents the storage from being cleared.
+         */
+        DO_NOT_CLEAR_ON_CLOSE
+    }
+
+    /**
+     * Return a Curator Cache for the given path with the given options using a standard storage instance
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @param options any options
+     * @return cache (note it must be started via {@link #start()}
+     */
+    static CuratorCache build(CuratorFramework client, String path, Options... options)
+    {
+        return builder(client, path).withOptions(options).build();
+    }
+
+    /**
+     * Start a Curator Cache builder
+     *
+     * @param client Curator client
+     * @param path path to cache
+     * @return builder
+     */
+    static CuratorCacheBuilder builder(CuratorFramework client, String path)
+    {
+        return new CuratorCacheBuilderImpl(client, path);
+    }
+
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the storage instance being used
+     *
+     * @return storage
+     */
+    CuratorCacheStorage storage();
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
new file mode 100644
index 0000000..35a5f26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+public interface CuratorCacheBuilder
+{
+    /**
+     * @param options any options
+     * @return this
+     */
+    CuratorCacheBuilder withOptions(CuratorCache.Options... options);
+
+    /**
+     * Alternate storage to use. If not specified, {@link StandardCuratorCacheStorage#standard()} is used
+     *
+     * @param storage storage instance to use
+     * @return this
+     */
+    CuratorCacheBuilder withStorage(CuratorCacheStorage storage);
+
+    /**
+     * By default any unexpected exception is handled by logging the exception. You can change
+     * so that a handler is called instead. Under normal circumstances, this shouldn't be necessary.
+     *
+     * @param exceptionHandler exception handler to use
+     */
+    CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler);
+
+    /**
+     * Normally, listeners are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)}. Use this
+     * method to set a different executor.
+     *
+     * @param executor to use
+     */
+    CuratorCacheBuilder withExecutor(Executor executor);
+
+    /**
+     * Return a new Curator Cache based on the builder methods that have been called
+     *
+     * @return new Curator Cache
+     */
+    CuratorCache build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
new file mode 100644
index 0000000..9f9e03d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+class CuratorCacheBuilderImpl implements CuratorCacheBuilder
+{
+    private final CuratorFramework client;
+    private final String path;
+    private CuratorCacheStorage storage;
+    private Consumer<Exception> exceptionHandler;
+    private Executor executor;
+    private CuratorCache.Options[] options;
+
+    CuratorCacheBuilderImpl(CuratorFramework client, String path)
+    {
+        this.client = client;
+        this.path = path;
+    }
+
+    @Override
+    public CuratorCacheBuilder withOptions(CuratorCache.Options... options)
+    {
+        this.options = options;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withStorage(CuratorCacheStorage storage)
+    {
+        this.storage = storage;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler)
+    {
+        this.exceptionHandler = exceptionHandler;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheBuilder withExecutor(Executor executor)
+    {
+        this.executor = executor;
+        return this;
+    }
+
+    @Override
+    public CuratorCache build()
+    {
+        return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..ee95570
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final PersistentWatcher persistentWatcher;
+    private final CuratorFramework client;
+    private final CuratorCacheStorage storage;
+    private final String path;
+    private final boolean recursive;
+    private final boolean compressedData;
+    private final boolean clearOnClose;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+    private final Consumer<Exception> exceptionHandler;
+    private final Executor executor;
+
+    private volatile AtomicLong outstandingOps;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler)
+    {
+        Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.client = client;
+        this.storage = (storage != null) ? storage : CuratorCacheStorage.standard();
+        this.path = path;
+        recursive = !options.contains(Options.SINGLE_NODE_CACHE);
+        compressedData = options.contains(Options.COMPRESSED_DATA);
+        clearOnClose = !options.contains(Options.DO_NOT_CLEAR_ON_CLOSE);
+        persistentWatcher = new PersistentWatcher(client, path, recursive);
+        persistentWatcher.getListenable().addListener(this::processEvent);
+        persistentWatcher.getResetListenable().addListener(this::rebuild);
+        this.exceptionHandler = (exceptionHandler != null) ? exceptionHandler : e -> log.error("CuratorCache error", e);
+        this.executor = (executor != null) ? executor : client::runSafe;
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        outstandingOps = new AtomicLong(0);
+        persistentWatcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            persistentWatcher.close();
+            if ( clearOnClose )
+            {
+                storage.clear();
+            }
+        }
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return storage;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    private void rebuild()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        nodeChanged(path);
+        storage.stream()
+            .map(ChildData::getPath)
+            .filter(p -> !p.equals(path))
+            .forEach(this::nodeChanged);
+    }
+
+    private void processEvent(WatchedEvent event)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        // NOTE: Persistent/Recursive watchers never trigger NodeChildrenChanged
+
+        switch ( event.getType() )
+        {
+        case NodeDataChanged:
+        case NodeCreated:
+        {
+            nodeChanged(event.getPath());
+            break;
+        }
+
+        case NodeDeleted:
+        {
+            removeStorage(event.getPath());
+            break;
+        }
+        }
+    }
+
+    private void nodeChildrenChanged(String fromPath)
+    {
+        if ( (state.get() != State.STARTED) || !recursive )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child)));
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+                checkDecrementOutstandingOps();
+            };
+
+            checkIncrementOutstandingOps();
+            client.getChildren().inBackground(callback).forPath(fromPath);
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void nodeChanged(String fromPath)
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() == OK.intValue() )
+                {
+                    putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+                    nodeChildrenChanged(event.getPath());
+                }
+                else if ( event.getResultCode() == NONODE.intValue() )
+                {
+                    removeStorage(event.getPath());
+                }
+                else
+                {
+                    handleException(event);
+                }
+                checkDecrementOutstandingOps();
+            };
+
+            checkIncrementOutstandingOps();
+            if ( compressedData )
+            {
+                client.getData().decompressed().inBackground(callback).forPath(fromPath);
+            }
+            else
+            {
+                client.getData().inBackground(callback).forPath(fromPath);
+            }
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    private void putStorage(ChildData data)
+    {
+        Optional<ChildData> previousData = storage.put(data);
+        if ( previousData.isPresent() )
+        {
+            if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+            {
+                callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
+            }
+        }
+        else
+        {
+            callListeners(l -> l.event(NODE_CREATED, null, data));
+        }
+    }
+
+    private void removeStorage(String path)
+    {
+        storage.remove(path).ifPresent(previousData -> callListeners(l -> l.event(NODE_DELETED, previousData, null)));
+    }
+
+    private void callListeners(Consumer<CuratorCacheListener> proc)
+    {
+        if ( state.get() == State.STARTED )
+        {
+            executor.execute(() -> listenerManager.forEach(proc));
+        }
+    }
+
+    private void handleException(CuratorEvent event)
+    {
+        handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
+    }
+
+    private void handleException(Exception e)
+    {
+        ThreadUtils.checkInterrupted(e);
+        exceptionHandler.accept(e);
+    }
+
+    private void checkIncrementOutstandingOps()
+    {
+        AtomicLong localOutstandingOps = outstandingOps;
+        if ( localOutstandingOps != null )
+        {
+            localOutstandingOps.incrementAndGet();
+        }
+    }
+
+    private void checkDecrementOutstandingOps()
+    {
+        AtomicLong localOutstandingOps = outstandingOps;
+        if ( localOutstandingOps != null )
+        {
+            if ( localOutstandingOps.decrementAndGet() == 0 )
+            {
+                outstandingOps = null;
+                callListeners(CuratorCacheListener::initialized);
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..620e471
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+/**
+ * Listener for {@link CuratorCache} events. The main functional interface is general purpose
+ * but you can build event specific listeners, etc. using the builder. Note: all listeners
+ * are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)} when called.
+ */
+@FunctionalInterface
+public interface CuratorCacheListener
+{
+    /**
+     * An enumerated type that describes a change
+     */
+    enum Type
+    {
+        /**
+         * A new node was added to the cache
+         */
+        NODE_CREATED,
+
+        /**
+         * A node already in the cache has changed
+         */
+        NODE_CHANGED,
+
+        /**
+         * A node already in the cache was deleted
+         */
+        NODE_DELETED
+    }
+
+    /**
+     * Called when a data is created, changed or deleted.
+     *
+     * @param type the type of event
+     * @param oldData the old data or null
+     * @param data the new data or null
+     */
+    void event(Type type, ChildData oldData, ChildData data);
+
+    /**
+     * When the cache is started, the initial nodes are tracked and when they are finished loading
+     * into the cache this method is called.
+     */
+    default void initialized()
+    {
+        // NOP
+    }
+
+    /**
+     * Returns a builder allowing type specific, and special purpose listeners.
+     *
+     * @return builder
+     */
+    static CuratorCacheListenerBuilder builder()
+    {
+        return new CuratorCacheListenerBuilderImpl();
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
new file mode 100644
index 0000000..c57e881
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
+import java.util.function.Consumer;
+
+public interface CuratorCacheListenerBuilder
+{
+    /**
+     * Add a standard listener
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forAll(CuratorCacheListener listener);
+
+    /**
+     * Add a listener only for {@link Type#NODE_CREATED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener);
+
+    @FunctionalInterface
+    interface ChangeListener
+    {
+        void event(ChildData oldNode, ChildData node);
+    }
+
+    /**
+     * Add a listener only for {@link Type#NODE_CHANGED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forChanges(ChangeListener listener);
+
+    /**
+     * Add a listener only both {@link Type#NODE_CREATED} and {@link Type#NODE_CHANGED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener);
+
+    /**
+     * Add a listener only for {@link Type#NODE_DELETED}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener);
+
+    /**
+     * Add a listener only for {@link CuratorCacheListener#initialized()}
+     *
+     * @param listener listener to add
+     * @return this
+     */
+    CuratorCacheListenerBuilder forInitialized(Runnable listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+     * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+     * does not register the listener with the connection state listener container. Also note that CuratorCache
+     * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so
+     * things such as event ordering will likely be different.
+     *
+     * @param client the curator client
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener);
+
+    /**
+     * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+     * with CuratorCache.
+     *
+     * @param listener the listener to wrap
+     * @return a CuratorCacheListener that forwards to the given listener
+     */
+    CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener);
+
+    /**
+     * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called.
+     * i.e. changes that occur as the cache is initializing are not sent to the listener
+     */
+    CuratorCacheListenerBuilder afterInitialized();
+
+    /**
+     * Build and return a new listener based on the methods that have been previously called
+     *
+     * @return new listener
+     */
+    CuratorCacheListener build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
new file mode 100644
index 0000000..4873868
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
+{
+    private final List<CuratorCacheListener> listeners = new ArrayList<>();
+    private boolean afterInitializedOnly = false;
+
+    @Override
+    public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener)
+    {
+        listeners.add(listener);
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CREATED )
+            {
+                listener.accept(node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forChanges(ChangeListener listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CHANGED )
+            {
+                listener.event(oldNode, node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( (type == CuratorCacheListener.Type.NODE_CHANGED) || (type == CuratorCacheListener.Type.NODE_CREATED) )
+            {
+                listener.event(oldNode, node);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener)
+    {
+        listeners.add((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_DELETED )
+            {
+                listener.accept(oldNode);
+            }
+        });
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forInitialized(Runnable listener)
+    {
+        CuratorCacheListener localListener = new CuratorCacheListener()
+        {
+            @Override
+            public void event(Type type, ChildData oldData, ChildData data)
+            {
+                // NOP
+            }
+
+            @Override
+            public void initialized()
+            {
+                listener.run();
+            }
+        };
+        listeners.add(localListener);
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener)
+    {
+        listeners.add(new TreeCacheListenerWrapper(client, listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener)
+    {
+        listeners.add(new NodeCacheListenerWrapper(listener));
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListenerBuilder afterInitialized()
+    {
+        afterInitializedOnly = true;
+        return this;
+    }
+
+    @Override
+    public CuratorCacheListener build()
+    {
+        List<CuratorCacheListener> copy = new ArrayList<>(listeners);
+        return new CuratorCacheListener()
+        {
+            private volatile boolean isInitialized = !afterInitializedOnly;
+
+            @Override
+            public void event(Type type, ChildData oldData, ChildData data)
+            {
+                if ( isInitialized )
+                {
+                    copy.forEach(l -> l.event(type, oldData, data));
+                }
+            }
+
+            @Override
+            public void initialized()
+            {
+                isInitialized = true;
+                copy.forEach(CuratorCacheListener::initialized);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..e809263
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Interface for maintaining data in a {@link CuratorCache}
+ */
+public interface CuratorCacheStorage
+{
+    /**
+     * Return a new standard storage instance
+     *
+     * @return storage instance
+     */
+    static CuratorCacheStorage standard()
+    {
+        return new StandardCuratorCacheStorage(true);
+    }
+
+    /**
+     * Return a new storage instance that does not retain the data bytes. i.e. ChildData objects
+     * returned by this storage will always return {@code null} for {@link ChildData#getData()}.
+     *
+     * @return storage instance that does not retain data bytes
+     */
+    static CuratorCacheStorage bytesNotCached()
+    {
+        return new StandardCuratorCacheStorage(false);
+    }
+
+    /**
+     * Add an entry to storage and return any previous entry at that path
+     *
+     * @param data entry to add
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> put(ChildData data);
+
+    /**
+     * Remove the entry from storage and return any previous entry at that path
+     *
+     * @param path path to remove
+     * @return previous entry or {@code empty()}
+     */
+    Optional<ChildData> remove(String path);
+
+    /**
+     * Return an entry from storage
+     *
+     * @param path path to get
+     * @return entry or {@code empty()}
+     */
+    Optional<ChildData> get(String path);
+
+    /**
+     * Return the current number of entries in storage
+     *
+     * @return number of entries
+     */
+    int size();
+
+    /**
+     * Return a stream over the storage entries. Note: for a standard storage instance, the stream
+     * behaves like a stream returned by {@link java.util.concurrent.ConcurrentHashMap#entrySet()}
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> stream();
+
+    /**
+     * Return a stream over the storage entries that are the immediate children of the given node.
+     *
+     * @return stream over entries
+     */
+    Stream<ChildData> streamImmediateChildren(String fromParent);
+
+    /**
+     * Utility - given a stream of child nodes, build a map. Note: it is assumed that each child
+     * data in the stream has a unique path
+     *
+     * @param stream stream of child nodes with unique paths
+     * @return map
+     */
+    static Map<String, ChildData> toMap(Stream<ChildData> stream)
+    {
+        return stream.map(data -> new AbstractMap.SimpleEntry<>(data.getPath(), data))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    /**
+     * Reset the storage to zero entries
+     */
+    void clear();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9687e1b..8cbfcf4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -53,7 +53,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
  */
+@Deprecated
 public class NodeCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..468049b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+
+class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final NodeCacheListener listener;
+
+    NodeCacheListenerWrapper(NodeCacheListener listener)
+    {
+        this.listener = listener;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        try
+        {
+            listener.nodeChanged();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..b85285e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -64,8 +64,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
  */
-@SuppressWarnings("NullableProblems")
+@Deprecated
 public class PathChildrenCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..a9123c1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+    private final PathChildrenCacheListener listener;
+    private final CuratorFramework client;
+
+    PathChildrenCacheListenerWrapper(CuratorFramework client, PathChildrenCacheListener listener)
+    {
+        this.listener = listener;
+        this.client = client;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        switch ( type )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void initialized()
+    {
+        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+    }
+
+    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    {
+        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..bbdb21d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.utils.ZKPaths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+    private final Map<String, ChildData> dataMap;
+    private final boolean cacheBytes;
+
+    StandardCuratorCacheStorage(boolean cacheBytes)
+    {
+        this.dataMap = new ConcurrentHashMap<>();
+        this.cacheBytes = cacheBytes;
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null);
+        return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        return Optional.ofNullable(dataMap.remove(path));
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(dataMap.get(path));
+    }
+
+    @Override
+    public int size()
+    {
+        return dataMap.size();
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        return dataMap.values().stream();
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return dataMap.entrySet()
+            .stream()
+            .filter(entry -> {
+                ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(entry.getKey());
+                return pathAndNode.getPath().equals(fromParent);
+            })
+            .map(Map.Entry::getValue);
+    }
+
+    @Override
+    public void clear()
+    {
+        dataMap.clear();
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..e2f3a8b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,7 +71,10 @@ import static org.apache.curator.utils.PathUtils.validatePath;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
  */
+@Deprecated
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..570799b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+    private final CuratorFramework client;
+    private final TreeCacheListener listener;
+
+    TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener)
+    {
+        this.client = client;
+        this.listener = listener;
+    }
+
+    @Override
+    public void event(Type type, ChildData oldData, ChildData data)
+    {
+        switch ( type )
+        {
+            case NODE_CREATED:
+            {
+                sendEvent(data, TreeCacheEvent.Type.NODE_ADDED);
+                break;
+            }
+
+            case NODE_CHANGED:
+            {
+                sendEvent(data, TreeCacheEvent.Type.NODE_UPDATED);
+                break;
+            }
+
+            case NODE_DELETED:
+            {
+                sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void initialized()
+    {
+        sendEvent(null, TreeCacheEvent.Type.INITIALIZED);
+    }
+
+    private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+    {
+        TreeCacheEvent event = new TreeCacheEvent(type, node);
+        try
+        {
+            listener.childEvent(client, event);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..87ecb6e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A managed persistent watcher. The watch will be managed such that it stays set through
+ * connection lapses, etc.
+ */
+public class PersistentWatcher implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+    private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard();
+    private final ConnectionStateListener connectionStateListener = (client, newState) -> {
+        if ( newState.isConnected() )
+        {
+            reset();
+        }
+    };
+    private final Watcher watcher = event -> listeners.forEach(w -> w.process(event));
+    private final CuratorFramework client;
+    private final String basePath;
+    private final boolean recursive;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    /**
+     * @param client client
+     * @param basePath path to set the watch on
+     * @param recursive ZooKeeper persistent watches can optionally be recursive
+     */
+    public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+        this.recursive = recursive;
+    }
+
+    /**
+     * Start watching
+     */
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    /**
+     * Remove the watcher
+     */
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            listeners.clear();
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+            try
+            {
+                client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.debug(String.format("Could not remove watcher for path: %s", basePath), e);
+            }
+        }
+    }
+
+    /**
+     * Container for setting listeners
+     *
+     * @return listener container
+     */
+    public Listenable<Watcher> getListenable()
+    {
+        return listeners;
+    }
+
+    /**
+     * Listeners are called when the persistent watcher has been successfully registered
+     * or re-registered after a connection disruption
+     *
+     * @return listener container
+     */
+    public StandardListenerManager<Runnable> getResetListenable()
+    {
+        return resetListeners;
+    }
+
+    private void reset()
+    {
+        try
+        {
+            BackgroundCallback callback = (__, event) -> {
+                if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+                {
+                    reset();
+                }
+                else
+                {
+                    resetListeners.forEach(Runnable::run);
+                }
+            };
+            client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
+        }
+        catch ( Exception e )
+        {
+            log.error("Could not reset persistent watch at path: " + basePath, e);
+        }
+    }
+}
diff --git a/curator-recipes/src/site/confluence/curator-cache.confluence b/curator-recipes/src/site/confluence/curator-cache.confluence
new file mode 100644
index 0000000..2eeb500
--- /dev/null
+++ b/curator-recipes/src/site/confluence/curator-cache.confluence
@@ -0,0 +1,36 @@
+h1. Curator Cache
+
+*Note: * CuratorCache requires ZooKeeper 3.6\+.
+
+h2. Description
+A utility that attempts to keep the data from a node locally cached. Optionally the entire tree of children below the node can also be cached.
+Will respond to update/create/delete events, pull down the data, etc. You can register listeners that will get notified when changes occur.
+
+h2. Participating Classes
+* CuratorCache
+* CuratorCacheListener
+* ChildData
+
+h2. Usage
+h3. Creating a CuratorCache
+{code}
+CuratorCache.build(CuratorFramework client, String path, Options... options)
+
+Parameters:
+client - the client
+path - path to watch
+options - empty or one or more options
+{code}
+
+Note: there is a builder factory available for additional options when building the cache instance. See {{CuratorCacheBuilder}} for details.
+
+h2. General Usage
+The cache must be started by calling {{start()}}. Call {{close()}} when you are through with the cache.
+
+At any time, call {{storage()}} to get the current state cache. You can also register to be notified when a change occurs by calling {{listenable()}} and then registering
+a listener for events.
+
+See the examples for an example usage.
+
+h2. Error Handling
+CuratorCache instances internally monitor connection losses, etc. automatically rebuilding the cache on reconnection.
diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence
index 08ef762..d96b5ce 100644
--- a/curator-recipes/src/site/confluence/index.confluence
+++ b/curator-recipes/src/site/confluence/index.confluence
@@ -24,9 +24,10 @@ regarding "Curator Recipes Own Their ZNode/Paths".
 |[[Distributed Atomic Long|distributed-atomic-long.html]] \- A counter that attempts atomic increments. It first tries using optimistic locking. If that fails, an optional InterProcessMutex is taken. For both optimistic and mutex, a retry policy is used to retry the increment.|
 
 ||Caches||
-|[[Path Cache|path-cache.html]] \- A Path Cache is used to watch a ZNode. Whenever a child is added, updated or removed, the Path Cache will change its state to contain the current set of children, the children's data and the children's state. Path caches in the Curator Framework are provided by the PathChildrenCache class. Changes to the path are passed to registered PathChildrenCacheListener instances.|
-|[[Node Cache|node-cache.html]] \- A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
-|[[Tree Cache|tree-cache.html]] \- A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
+|[[Curator Cache|curator-cache.html]] \- A utility that attempts to keep the data from a node locally cached. Optionally the entire tree of children below the node can also be cached. Will respond to update/create/delete events, pull down the data, etc. You can register listeners that will get notified when changes occur.|
+|[[Path Cache|path-cache.html]] \- (For pre-ZooKeeper 3.6.x) A Path Cache is used to watch a ZNode. Whenever a child is added, updated or removed, the Path Cache will change its state to contain the current set of children, the children's data and the children's state. Path caches in the Curator Framework are provided by the PathChildrenCache class. Changes to the path are passed to registered PathChildrenCacheListener instances.|
+|[[Node Cache|node-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
+|[[Tree Cache|tree-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.|
 
 ||Nodes||
 |[[Persistent Node|persistent-node.html]] \- A node that attempts to stay present in ZooKeeper, even through connection and session interruptions.|
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..8560f87
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCache extends CuratorTestBase
+{
+    @Test
+    public void testServerLoss() throws Exception   // mostly copied from TestPathChildrenCacheInCluster
+    {
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+            {
+                client.start();
+                client.create().creatingParentsIfNeeded().forPath("/test");
+
+                try (CuratorCache cache = CuratorCache.build(client, "/test"))
+                {
+                    cache.start();
+
+                    CountDownLatch reconnectLatch = new CountDownLatch(1);
+                    client.getConnectionStateListenable().addListener((__, newState) -> {
+                        if ( newState == ConnectionState.RECONNECTED )
+                        {
+                            reconnectLatch.countDown();
+                        }
+                    });
+                    CountDownLatch latch = new CountDownLatch(3);
+                    cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+                    client.create().forPath("/test/one");
+                    client.create().forPath("/test/two");
+                    client.create().forPath("/test/three");
+
+                    Assert.assertTrue(timing.awaitLatch(latch));
+
+                    InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+                    cluster.killServer(connectionInstance);
+
+                    Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+                    timing.sleepABit();
+
+                    Assert.assertEquals(cache.storage().stream().count(), 4);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
+    {
+        CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            try (CuratorCache cache = CuratorCache.builder(client, "/test").withStorage(storage).build())
+            {
+                cache.listenable().addListener(builder().forChanges((__, ___) -> updatedLatch.countDown()).build());
+                cache.listenable().addListener(builder().forCreates(__ -> addedLatch.countDown()).build());
+                cache.start();
+
+                client.create().forPath("/test/foo", "first".getBytes());
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+                client.setData().forPath("/test/foo", "something new".getBytes());
+                Assert.assertTrue(timing.awaitLatch(updatedLatch));
+            }
+        }
+    }
+
+    @Test
+    public void testAfterInitialized() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            client.create().creatingParentsIfNeeded().forPath("/test/one");
+            client.create().creatingParentsIfNeeded().forPath("/test/one/two");
+            client.create().creatingParentsIfNeeded().forPath("/test/one/two/three");
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                CountDownLatch initializedLatch = new CountDownLatch(1);
+                AtomicInteger eventCount = new AtomicInteger(0);
+                CuratorCacheListener listener = new CuratorCacheListener()
+                {
+                    @Override
+                    public void event(Type type, ChildData oldData, ChildData data)
+                    {
+                        eventCount.incrementAndGet();
+                    }
+
+                    @Override
+                    public void initialized()
+                    {
+                        initializedLatch.countDown();
+                    }
+                };
+                cache.listenable().addListener(builder().forAll(listener).afterInitialized().build());
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(initializedLatch));
+
+                Assert.assertEquals(initializedLatch.getCount(), 0);
+                Assert.assertEquals(cache.storage().size(), 4);
+                Assert.assertTrue(cache.storage().get("/test").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one/two").isPresent());
+                Assert.assertTrue(cache.storage().get("/test/one/two/three").isPresent());
+            }
+        }
+    }
+
+    @Test
+    public void testListenerBuilder() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                Semaphore all = new Semaphore(0);
+                Semaphore deletes = new Semaphore(0);
+                Semaphore changes = new Semaphore(0);
+                Semaphore creates = new Semaphore(0);
+                Semaphore createsAndChanges = new Semaphore(0);
+
+                CuratorCacheListener listener = builder().forAll((__, ___, ____) -> all.release()).forDeletes(__ -> deletes.release()).forChanges((__, ___) -> changes.release()).forCreates(__ -> creates.release()).forCreatesAndChanges((__, ___) -> createsAndChanges.release()).build();
+                cache.listenable().addListener(listener);
+                cache.start();
+
+                client.create().forPath("/test");
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(creates, 1));
+                Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+                Assert.assertEquals(changes.availablePermits(), 0);
+                Assert.assertEquals(deletes.availablePermits(), 0);
+
+                client.setData().forPath("/test", "new".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(changes, 1));
+                Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+                Assert.assertEquals(creates.availablePermits(), 0);
+                Assert.assertEquals(deletes.availablePermits(), 0);
+
+                client.delete().forPath("/test");
+                Assert.assertTrue(timing.acquireSemaphore(all, 1));
+                Assert.assertTrue(timing.acquireSemaphore(deletes, 1));
+                Assert.assertEquals(creates.availablePermits(), 0);
+                Assert.assertEquals(changes.availablePermits(), 0);
+                Assert.assertEquals(createsAndChanges.availablePermits(), 0);
+            }
+        }
+    }
+
+    @Test
+    public void testOverrideExecutor() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            CountDownLatch latch = new CountDownLatch(2);
+            Executor executor = proc -> {
+                latch.countDown();
+                proc.run();
+            };
+            try ( CuratorCache cache = CuratorCache.builder(client, "/test").withExecutor(executor).build() )
+            {
+                cache.listenable().addListener((type, oldData, data) -> latch.countDown());
+                cache.start();
+
+                client.create().forPath("/test");
+
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+        }
+    }
+
+    @Test
+    public void testClearOnClose() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            CuratorCacheStorage storage;
+            client.start();
+
+            try ( CuratorCache cache = CuratorCache.builder(client, "/test").withOptions(DO_NOT_CLEAR_ON_CLOSE).build() )
+            {
+                cache.start();
+                storage = cache.storage();
+
+                client.create().forPath("/test", "foo".getBytes());
+                client.create().forPath("/test/bar", "bar".getBytes());
+                timing.sleepABit();
+            }
+            Assert.assertEquals(storage.size(), 2);
+
+            try ( CuratorCache cache = CuratorCache.build(client, "/test") )
+            {
+                cache.start();
+                storage = cache.storage();
+
+                timing.sleepABit();
+            }
+            Assert.assertEquals(storage.size(), 0);
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..ab9dbe4
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+/**
+ * Randomly create nodes in a tree while a set of CuratorCaches listens. Afterwards, validate
+ * that the caches contain the same values as ZK itself
+ */
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheConsistency extends CuratorTestBase
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+    private static final Duration testLength = Duration.ofSeconds(30);
+    private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3);
+    private static final Duration sleepLength = Duration.ofMillis(5);
+    private static final int nodesPerLevel = 10;
+    private static final int clusterSize = 5;
+    private static final int maxServerKills = 2;
+
+    private static final String BASE_PATH = "/test";
+
+    private class Client implements Closeable
+    {
+        private final CuratorFramework client;
+        private final CuratorCache cache;
+        private final int index;
+        private final Map<String, ChildData> listenerDataMap = new HashMap<>();
+
+        Client(int index, String connectionString, AtomicReference<Exception> errorSignal)
+        {
+            this.index = index;
+            client = buildClient(connectionString);
+            cache = CuratorCache.builder(client, BASE_PATH).withOptions(DO_NOT_CLEAR_ON_CLOSE).withExceptionHandler(errorSignal::set).build();
+
+            // listenerDataMap is a local data map that will hold values sent by listeners
+            // this way, the listener code can be tested for validity and consistency
+            CuratorCacheListener listener = builder().forCreates(node -> {
+                ChildData previous = listenerDataMap.put(node.getPath(), node);
+                if ( previous != null )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Create for existing node: %s", index, node.getPath())));
+                }
+            }).forChanges((oldNode, node) -> {
+                ChildData previous = listenerDataMap.put(node.getPath(), node);
+                if ( (previous == null) || !Arrays.equals(previous.getData(), oldNode.getData()) )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Bad old value for change node: %s", index, node.getPath())));
+                }
+            }).forDeletes(node -> {
+                ChildData previous = listenerDataMap.remove(node.getPath());
+                if ( previous == null )
+                {
+                    errorSignal.set(new Exception(String.format("Client: %d - Delete for non-existent node: %s", index, node.getPath())));
+                }
+            }).build();
+            cache.listenable().addListener(listener);
+        }
+
+        void start()
+        {
+            client.start();
+            cache.start();
+        }
+
+        @Override
+        public void close()
+        {
+            cache.close();
+            client.close();
+        }
+    }
+
+    @Test
+    public void testConsistencyAfterSimulation() throws Exception
+    {
+        int clientQty = random.nextInt(10, 20);
+        int maxDepth = random.nextInt(5, 10);
+
+        log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+        List<Client> clients = Collections.emptyList();
+        Map<String, String> actualTree;
+
+        AtomicReference<Exception> errorSignal = new AtomicReference<>();
+        try (TestingCluster cluster = new TestingCluster(clusterSize))
+        {
+            cluster.start();
+
+            initializeBasePath(cluster);
+            try
+            {
+                clients = buildClients(cluster, clientQty, errorSignal);
+                workLoop(cluster, clients, maxDepth, errorSignal);
+
+                log.info("Test complete - sleeping to allow events to complete");
+                timing.sleepABit();
+            }
+            finally
+            {
+                clients.forEach(Client::close);
+            }
+
+            actualTree = buildActual(cluster);
+        }
+
+        log.info("client qty: {}", clientQty);
+
+        Map<Integer, List<String>> errorsList = clients.stream()
+            .map(client ->  findErrors(client, actualTree))
+            .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        if ( !errorsList.isEmpty() )
+        {
+            log.error("{} clients had errors", errorsList.size());
+            errorsList.forEach((index, errorList) -> {
+                log.error("Client {}", index);
+                errorList.forEach(log::error);
+                log.error("");
+            });
+
+            Assert.fail("Errors found");
+        }
+    }
+
+    // build a data map recursively from the actual values in ZK
+    private Map<String, String> buildActual(TestingCluster cluster)
+    {
+        Map<String, String> actual = new HashMap<>();
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            buildActual(client, actual, BASE_PATH);
+        }
+        return actual;
+    }
+
+    private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath)
+    {
+        try
+        {
+            byte[] bytes = client.getData().forPath(fromPath);
+            actual.put(fromPath, new String(bytes));
+            client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("", e);
+        }
+    }
+
+    private List<Client> buildClients(TestingCluster cluster, int clientQty, AtomicReference<Exception> errorSignal)
+    {
+        return IntStream.range(0, clientQty)
+            .mapToObj(index -> new Client(index, cluster.getConnectString(), errorSignal))
+            .peek(Client::start)
+            .collect(Collectors.toList());
+    }
+
+    private void initializeBasePath(TestingCluster cluster) throws Exception
+    {
+        try (CuratorFramework client = buildClient(cluster.getConnectString()))
+        {
+            client.start();
+            client.create().forPath(BASE_PATH, "".getBytes());
+        }
+    }
+
+    private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth, AtomicReference<Exception> errorSignal) throws Exception
+    {
+        Instant start = Instant.now();
+        Instant lastServerKill = Instant.now();
+        int serverKillIndex = 0;
+        while ( true )
+        {
+            Duration elapsed = Duration.between(start, Instant.now());
+            if ( elapsed.compareTo(testLength) >= 0 )
+            {
+                break;
+            }
+
+            Exception errorSignalException = errorSignal.get();
+            if ( errorSignalException != null )
+            {
+                Assert.fail("A client's error handler was called", errorSignalException);
+            }
+
+            Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now());
+            if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+            {
+                lastServerKill = Instant.now();
+                if ( serverKillIndex < maxServerKills )
+                {
+                    doKillServer(cluster, serverKillIndex++);
+                }
+            }
+
+            int thisDepth = random.nextInt(0, maxDepth);
+            String thisPath = randomPath(thisDepth);
+            CuratorFramework client = randomClient(clients);
+            if ( random.nextBoolean() )
+            {
+                doDelete(client, thisPath);
+            }
+            else
+            {
+                doChange(client, thisPath);
+            }
+
+            Thread.sleep(sleepLength.toMillis());
+        }
+    }
+
+    private void doChange(CuratorFramework client, String thisPath)
+    {
+        try
+        {
+            String thisData = Long.toString(random.nextLong());
+            client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes());
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not create/set: " + thisPath);
+        }
+    }
+
+    private void doDelete(CuratorFramework client, String thisPath)
+    {
+        if ( thisPath.equals(BASE_PATH) )
+        {
+            return;
+        }
+        try
+        {
+            client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+        }
+        catch ( Exception e )
+        {
+            Assert.fail("Could not delete: " + thisPath);
+        }
+    }
+
+    private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception
+    {
+        log.info("Killing server {}", serverKillIndex);
+        InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+        cluster.killServer(killSpec);
+    }
+
+    private CuratorFramework randomClient(List<Client> clients)
+    {
+        return clients.get(random.nextInt(clients.size())).client;
+    }
+
+    private Map.Entry<Integer, List<String>> findErrors(Client client, Map<String, String> tree)
+    {
+        CuratorCacheStorage storage = client.cache.storage();
+        List<String> errors = new ArrayList<>();
+        if ( tree.size() != storage.size() )
+        {
+            errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size()));
+        }
+        tree.keySet().forEach(path -> {
+            if ( !storage.get(path).isPresent() )
+            {
+                errors.add(String.format("Path %s in master but not client", path));
+            }
+        });
+        storage.stream().forEach(data -> {
+            String treeValue = tree.get(data.getPath());
+            if ( treeValue != null )
+            {
+                if ( !treeValue.equals(new String(data.getData())) )
+                {
+                    errors.add(String.format("Data at %s is not the same", data.getPath()));
+                }
+
+                ChildData listenersMapData = client.listenerDataMap.get(data.getPath());
+                if ( listenersMapData == null )
+                {
+                    errors.add(String.format("listenersMap missing data at: %s", data.getPath()));
+                }
+                else if ( !treeValue.equals(new String(listenersMapData.getData())) )
+                {
+                    errors.add(String.format("Data at %s in listenersMap is not the same", data.getPath()));
+                }
+            }
+            else
+            {
+                errors.add(String.format("Path %s in client but not master", data.getPath()));
+            }
+        });
+
+        client.listenerDataMap.keySet().forEach(path -> {
+            if ( !storage.get(path).isPresent() )
+            {
+                errors.add(String.format("Path %s in listenersMap but not storage", path));
+            }
+        });
+
+        return new AbstractMap.SimpleEntry<>(client.index, errors);
+    }
+
+    private String randomPath(int depth)
+    {
+        StringBuilder str = new StringBuilder(BASE_PATH);
+        while ( depth-- > 0 )
+        {
+            int levelNodeName = random.nextInt(nodesPerLevel);
+            str.append("/").append(levelNodeName);
+        }
+        return str.toString();
+    }
+
+    @Override
+    protected void createServer()
+    {
+        // do nothing - we'll be using TestingCluster instead
+    }
+
+    private CuratorFramework buildClient(String connectionString)
+    {
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100);
+        return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy);
+    }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
new file mode 100644
index 0000000..8baf2e2
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEventOrdering extends TestEventOrdering<CuratorCache>
+{
+    @Override
+    protected int getActualQty(CuratorCache cache)
+    {
+        return cache.storage().size();
+    }
+
+    @Override
+    protected CuratorCache newCache(CuratorFramework client, String path, BlockingQueue<Event> events)
+    {
+        CuratorCache cache = CuratorCache.build(client, path);
+        cache.listenable().addListener((type, oldNode, node) -> {
+            if ( type == CuratorCacheListener.Type.NODE_CREATED )
+            {
+                events.add(new Event(EventType.ADDED, node.getPath()));
+            }
+            else if ( type == CuratorCacheListener.Type.NODE_DELETED )
+            {
+                events.add(new Event(EventType.DELETED, oldNode.getPath()));
+            }
+        });
+        cache.start();
+        return cache;
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
new file mode 100644
index 0000000..4a75acf
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.SINGLE_NODE_CACHE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheStorage.toMap;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheWrappers extends CuratorTestBase
+{
+    @Test
+    public void testPathChildrenCache() throws Exception    // copied from TestPathChildrenCache#testBasics()
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<>();
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                PathChildrenCacheListener listener = (__, event) -> {
+                    if ( event.getData().getPath().equals("/test/one") )
+                    {
+                        events.offer(event.getType());
+                    }
+                };
+                cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
+                cache.start();
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            }
+        }
+    }
+
+    @Test
+    public void testTreeCache() throws Exception    // copied from TestTreeCache#testBasics()
+    {
+        BaseTestTreeCache treeCacheBase = new BaseTestTreeCache();
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test"))
+            {
+                cache.listenable().addListener(builder().forTreeCache(client, treeCacheBase.eventListener).build());
+                cache.start();
+
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.INITIALIZED);
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/t").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/testing").count(), 0);
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "hey there");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test/one")).keySet(), ImmutableSet.of());
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/o").count(), 0);
+                Assert.assertEquals(cache.storage().streamImmediateChildren("/test/onely").count(), 0);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+                Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+                Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+            }
+        }
+    }
+
+    @Test
+    public void testNodeCache() throws Exception    // copied from TestNodeCache#testBasics()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            try (CuratorCache cache = CuratorCache.build(client, "/test/node", SINGLE_NODE_CACHE))
+            {
+                Supplier<ChildData> getRootData = () -> cache.storage().get("/test/node").orElseThrow(() -> new AssertionError("is not present"));
+                cache.start();
+
+                final Semaphore semaphore = new Semaphore(0);
+                cache.listenable().addListener(builder().forNodeCache(semaphore::release).build());
+                try
+                {
+                    getRootData.get();
+                    Assert.fail("Should have thrown");
+                }
+                catch ( AssertionError expected )
+                {
+                    // expected
+                }
+
+                client.create().forPath("/test/node", "a".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(getRootData.get().getData(), "a".getBytes());
+
+                client.setData().forPath("/test/node", "b".getBytes());
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                Assert.assertEquals(getRootData.get().getData(), "b".getBytes());
+
+                client.delete().forPath("/test/node");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore));
+                try
+                {
+                    getRootData.get();
+                    Assert.fail("Should have thrown");
+                }
+                catch ( AssertionError expected )
+                {
+                    // expected
+                }
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..3e81b63
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestWrappedNodeCache extends CuratorTestBase
+{
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes());
+
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCache.build(client, "/test/foo");
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/foo");
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "one".getBytes());
+
+            client.delete().forPath("/test/foo");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            client.create().forPath("/test/foo", "two".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "two".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes());
+
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            });
+
+            cache = CuratorCache.build(client,"/test/node");
+
+            Semaphore latch = new Semaphore(0);
+            NodeCacheListener listener = latch::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+            cache.start();
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "start".getBytes());
+
+            client.setData().forPath("/test/node", "new data".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(latch));
+            Assert.assertTrue(rootData.get().isPresent());
+            Assert.assertEquals(rootData.get().get().getData(), "new data".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCache.build(client, "/test/node");
+            cache.start();
+
+            Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+            final Semaphore semaphore = new Semaphore(0);
+            NodeCacheListener listener = semaphore::release;
+            cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+            Assert.assertNull(rootData.get().orElse(null));
+
+            client.create().forPath("/test/node", "a".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(rootData.get().orElse(null).getData(), "a".getBytes());
+
+            client.setData().forPath("/test/node", "b".getBytes());
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertEquals(rootData.get().orElse(null).getData(), "b".getBytes());
+
+            client.delete().forPath("/test/node");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore));
+            Assert.assertNull(rootData.get().orElse(null));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    private Supplier<Optional<ChildData>> getRootDataProc(CuratorCache cache, String rootPath)
+    {
+        return () -> cache.storage().get(rootPath);
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
new file mode 100644
index 0000000..534c365
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPersistentWatcher extends CuratorTestBase
+{
+    @Test
+    public void testConnectionLostRecursive() throws Exception
+    {
+        internalTest(true);
+    }
+
+    @Test
+    public void testConnectionLost() throws Exception
+    {
+        internalTest(false);
+    }
+
+    private void internalTest(boolean recursive) throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            client.start();
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+                else if ( newState == ConnectionState.RECONNECTED )
+                {
+                    reconnectedLatch.countDown();
+                }
+            });
+
+            try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) )
+            {
+                persistentWatcher.start();
+
+                BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+                persistentWatcher.getListenable().addListener(events::add);
+
+                client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+                if ( recursive )
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+                }
+                else
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");   // child added
+                }
+
+                server.stop();
+                Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected);
+                Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+                server.restart();
+                Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+
+                timing.sleepABit();     // time to allow watcher to get reset
+                events.clear();
+
+                if ( recursive )
+                {
+                    client.setData().forPath("/top/main/a", "foo".getBytes());
+                    Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+                }
+                client.setData().forPath("/top/main", "bar".getBytes());
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/site/confluence/index.confluence b/src/site/confluence/index.confluence
index af43300..f2237e1 100644
--- a/src/site/confluence/index.confluence
+++ b/src/site/confluence/index.confluence
@@ -41,6 +41,6 @@ most users, the only artifact you need is curator\-recipes.
 
 h2. ZooKeeper Compatibility
 
-Apache Curator is meant to be used with ZooKeeper 3.5\+. However, it is also compatible with ZooKeeper 3.4.x.
+Apache Curator is meant to be used with the latest version of ZooKeeper. However, it is also compatible with earlier versions of ZooKeeper.
 See [[Compatibility|zk-compatibility.html]] for details.
 
diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility.confluence
index 240734a..03a3ebe 100644
--- a/src/site/confluence/zk-compatibility.confluence
+++ b/src/site/confluence/zk-compatibility.confluence
@@ -13,7 +13,7 @@ Persistent/Recursive watchers.
 
 h2. ZooKeeper 3.5.x and ZooKeeper 3.4.x
 
-Curator 4.0 supports earlier versions of ZooKeeper in a soft\-compatibility mode. To use this mode
+Curator supports earlier versions of ZooKeeper in a soft\-compatibility mode. To use this mode
 you must exclude ZooKeeper when adding Curator to your dependency management tool.
 
 _Maven_