You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/10/04 14:02:01 UTC

[1/8] curator git commit: 1. Persistent watches are now optionally recursive - support this. 2. Added bridge classes to help TreeCache users switch to CuratorCache

Repository: curator
Updated Branches:
  refs/heads/persistent-watch 9a05598c4 -> d7bf1a246


1. Persistent watches are now optionally recursive - support this. 2. Added bridge classes to help TreeCache users switch to CuratorCache


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a83e3e0b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a83e3e0b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a83e3e0b

Branch: refs/heads/persistent-watch
Commit: a83e3e0b5f1a8ea031fcf2cb32f745880ecaa8b3
Parents: 9a05598
Author: randgalt <ra...@apache.org>
Authored: Wed Aug 23 08:04:01 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Aug 23 08:04:01 2017 +0200

----------------------------------------------------------------------
 .../api/AddPersistentWatchBuilder.java          |  11 +-
 .../api/AddPersistentWatchBuilder2.java         |  25 +
 .../imps/AddPersistentWatchBuilderImpl.java     |  12 +-
 .../framework/recipes/cache/ListenerBridge.java | 197 ++++++++
 .../framework/recipes/cache/SelectorBridge.java |  55 ++
 .../recipes/watch/InternalCuratorCache.java     |   2 +-
 .../recipes/watch/PersistentWatcher.java        |   9 +-
 .../recipes/cache/BaseTestTreeCache.java        |  28 +-
 .../framework/recipes/cache/TestTreeCache.java  |   2 +-
 .../recipes/cache/TestTreeCacheBridge.java      | 500 +++++++++++++++++++
 .../cache/TestTreeCacheBridgeRandomTree.java    | 224 +++++++++
 .../recipes/cache/TestTreeCacheRandomTree.java  |   2 +-
 .../org/apache/curator/test/WatchersDebug.java  |   9 +
 13 files changed, 1064 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
index 4927afc..057919e 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
@@ -18,8 +18,13 @@
  */
 package org.apache.curator.framework.api;
 
-public interface AddPersistentWatchBuilder extends
-    Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
-    AddPersistentWatchable<Pathable<Void>>
+public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2
 {
+    /**
+     * ZooKeeper persistent watches can optionally be recursive. See
+     * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+     *
+     * @return this
+     */
+    AddPersistentWatchBuilder2 recursive();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
new file mode 100644
index 0000000..ce1ffed
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AddPersistentWatchBuilder2 extends
+    Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
+    AddPersistentWatchable<Pathable<Void>>
+{
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
index bf4dfb6..56f8f79 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.imps;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.drivers.OperationTrace;
 import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
 import org.apache.curator.framework.api.AddPersistentWatchable;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -37,6 +38,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
     private final CuratorFrameworkImpl client;
     private Watching watching = null;
     private Backgrounding backgrounding = new Backgrounding();
+    private boolean recursive = false;
 
     AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
     {
@@ -51,6 +53,13 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
     }
 
     @Override
+    public AddPersistentWatchBuilder2 recursive()
+    {
+        recursive = true;
+        return this;
+    }
+
+    @Override
     public Pathable<Void> usingWatcher(Watcher watcher)
     {
         watching = new Watching(client, watcher);
@@ -125,6 +134,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
                 (
                     fixedPath,
                     watching.getWatcher(path),
+                    recursive,
                     new AsyncCallback.VoidCallback()
                     {
                         @Override
@@ -156,7 +166,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
                 @Override
                 public Void call() throws Exception
                 {
-                    client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path));
+                    client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive);
                     return null;
                 }
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
new file mode 100644
index 0000000..8a2d665
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
@@ -0,0 +1,197 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.recipes.watch.CacheEvent;
+import org.apache.curator.framework.recipes.watch.CacheListener;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>
+ * Utility to bridge old TreeCache {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}
+ * instances with new {@link org.apache.curator.framework.recipes.watch.CacheListener} so that you can
+ * use existing listeners without rewriting them.
+ * </p>
+ *
+ * <p>
+ * Create a ListenerBridge from your existing TreeCacheListener. You can then call {@link #add()}
+ * to add the bridge listener to a CuratorCache.
+ * </p>
+ */
+public class ListenerBridge implements CacheListener, ConnectionStateListener
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final Listenable<CacheListener> listenable;
+    private final TreeCacheListener listener;
+    private final AtomicBoolean added = new AtomicBoolean(false);
+
+    /**
+     * Builder style constructor
+     *
+     * @param client the client
+     * @param listenable CuratorCache listener container
+     * @param listener the old TreeCacheListener
+     * @return listener bridge
+     */
+    public static ListenerBridge wrap(CuratorFramework client, Listenable<CacheListener> listenable, TreeCacheListener listener)
+    {
+        return new ListenerBridge(client, listenable, listener);
+    }
+
+    /**
+     * @param client the client
+     * @param listenable CuratorCache listener container
+     * @param listener the old TreeCacheListener
+     */
+    public ListenerBridge(CuratorFramework client, Listenable<CacheListener> listenable, TreeCacheListener listener)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.listenable = Objects.requireNonNull(listenable, "listenable cannot be null");
+        this.listener = Objects.requireNonNull(listener, "listener cannot be null");
+    }
+
+    @Override
+    public void process(CacheEvent event, String path, CachedNode affectedNode)
+    {
+        try
+        {
+            listener.childEvent(client, toEvent(event, path, affectedNode));
+        }
+        catch ( Exception e )
+        {
+            handleException(e);
+        }
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState)
+    {
+        TreeCacheEvent.Type type = toType(newState);
+        if ( type != null )
+        {
+            try
+            {
+                listener.childEvent(client, new TreeCacheEvent(type, null));
+            }
+            catch ( Exception e )
+            {
+                handleException(e);
+            }
+        }
+    }
+
+    /**
+     * Add this listener to the listener container. Note: this method is not idempotent
+     */
+    public void add()
+    {
+        Preconditions.checkState(added.compareAndSet(false, true), "Already added");
+        client.getConnectionStateListenable().addListener(this);
+        listenable.addListener(this);
+    }
+
+    /**
+     * Remove this listener from the listener container
+     */
+    public void remove()
+    {
+        if ( added.compareAndSet(true, false) )
+        {
+            client.getConnectionStateListenable().removeListener(this);
+            listenable.removeListener(this);
+        }
+    }
+
+    /**
+     * Utility - convert a new CuratorCache event to an old TreeCache event
+     *
+     * @param event event to convert
+     * @return new value
+     */
+    public static TreeCacheEvent.Type toType(CacheEvent event)
+    {
+        switch ( event )
+        {
+        case NODE_CREATED:
+            return TreeCacheEvent.Type.NODE_ADDED;
+
+        case NODE_DELETED:
+            return TreeCacheEvent.Type.NODE_REMOVED;
+
+        case NODE_CHANGED:
+            return TreeCacheEvent.Type.NODE_UPDATED;
+
+        case CACHE_REFRESHED:
+            return TreeCacheEvent.Type.INITIALIZED;
+        }
+
+        throw new IllegalStateException("Unknown event: " + event);
+    }
+
+    /**
+     * Utility - convert a connection state event to an old TreeCache event
+     *
+     * @param state event to convert
+     * @return new value or null if there is no corresponding TreeCache value
+     */
+    public static TreeCacheEvent.Type toType(ConnectionState state)
+    {
+        switch ( state )
+        {
+        case RECONNECTED:
+            return TreeCacheEvent.Type.CONNECTION_RECONNECTED;
+
+        case SUSPENDED:
+            return TreeCacheEvent.Type.CONNECTION_SUSPENDED;
+
+        case LOST:
+            return TreeCacheEvent.Type.CONNECTION_LOST;
+        }
+
+        return null;
+    }
+
+    /**
+     * Convert Curator Cache listener values to TreeCache data
+     *
+     * @param path the affected path (can be null)
+     * @param affectedNode the node (can be null)
+     * @return TreeCache data or null
+     */
+    public static ChildData toData(String path, CachedNode affectedNode)
+    {
+        if ( (path != null) && (affectedNode != null) && (affectedNode.getData() != null) )
+        {
+            return new ChildData(path, affectedNode.getStat(), affectedNode.getData());
+        }
+        return null;
+    }
+
+    /**
+     * Generate a TreeCacheEvent from Curator cache event data
+     *
+     * @param event event type
+     * @param path affected path (can be null)
+     * @param affectedNode affected data (can be null)
+     * @return event
+     */
+    public static TreeCacheEvent toEvent(CacheEvent event, String path, CachedNode affectedNode)
+    {
+        TreeCacheEvent.Type type = toType(event);
+        ChildData data = (event == CacheEvent.CACHE_REFRESHED) ? null : toData(path, affectedNode);
+        return new TreeCacheEvent(type, data);
+    }
+
+    protected void handleException(Exception e)
+    {
+        log.error("Unhandled exception in listener", e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
new file mode 100644
index 0000000..0c6af08
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
@@ -0,0 +1,55 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.watch.CacheAction;
+import org.apache.curator.framework.recipes.watch.CacheSelector;
+import java.util.Objects;
+
+/**
+ * Utility to bridge an old TreeCacheSelector to a new CuratorCache selector
+ */
+public class SelectorBridge implements CacheSelector
+{
+    private final TreeCacheSelector selector;
+    private final CacheAction action;
+
+    /**
+     * Builder style constructor
+     *
+     * @param selector the old TreeCacheSelector to bridge
+     * @return bridged selector
+     */
+    public static SelectorBridge wrap(TreeCacheSelector selector)
+    {
+        return new SelectorBridge(selector);
+    }
+
+    /**
+     * @param selector the old TreeCacheSelector to bridge
+     */
+    public SelectorBridge(TreeCacheSelector selector)
+    {
+        this(selector, CacheAction.STAT_AND_DATA);
+    }
+
+    /**
+     * @param selector the old TreeCacheSelector to bridge
+     * @param action value to return for active paths
+     */
+    public SelectorBridge(TreeCacheSelector selector, CacheAction action)
+    {
+        this.selector = Objects.requireNonNull(selector, "selector cannot be null");
+        this.action = Objects.requireNonNull(action, "action cannot be null");
+    }
+
+    @Override
+    public boolean traverseChildren(String basePath, String fullPath)
+    {
+        return selector.traverseChildren(fullPath);
+    }
+
+    @Override
+    public CacheAction actionForPath(String basePath, String fullPath)
+    {
+        return selector.acceptChild(fullPath) ? action : CacheAction.NOT_STORED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index e2b1bf3..edd08b5 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -71,7 +71,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
         this.basePath = Objects.requireNonNull(path, "path cannot be null");
         this.cacheSelector = Objects.requireNonNull(cacheSelector, "cacheSelector cannot be null");
         this.sortChildren = sortChildren;
-        watcher = new PersistentWatcher(client, path)
+        watcher = new PersistentWatcher(client, path, true)
         {
             @Override
             protected void noteWatcherReset()

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 310478a..3884a69 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.watch;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
@@ -83,6 +84,7 @@ public class PersistentWatcher implements Closeable
     };
     private final CuratorFramework client;
     private final String basePath;
+    private final boolean recursive;
     private final BackgroundCallback backgroundCallback = new BackgroundCallback()
     {
         @Override
@@ -115,11 +117,13 @@ public class PersistentWatcher implements Closeable
     /**
      * @param client client
      * @param basePath path to set the watch on
+     * @param recursive ZooKeeper persistent watches can optionally be recursive
      */
-    public PersistentWatcher(CuratorFramework client, String basePath)
+    public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");
         this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+        this.recursive = recursive;
     }
 
     /**
@@ -176,7 +180,8 @@ public class PersistentWatcher implements Closeable
     {
         try
         {
-            client.addPersistentWatch().inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath);
+            AddPersistentWatchBuilder2 builder = recursive ? client.addPersistentWatch().recursive() : client.addPersistentWatch();
+            builder.inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath);
         }
         catch ( Exception e )
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index b984624..9cbec98 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -23,23 +23,25 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import java.io.Closeable;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class BaseTestTreeCache extends BaseClassForTests
+public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests
 {
     CuratorFramework client;
-    TreeCache cache;
+    T cache;
     protected final AtomicBoolean hadBackgroundException = new AtomicBoolean(false);
     private final BlockingQueue<TreeCacheEvent> events = new LinkedBlockingQueue<TreeCacheEvent>();
     private final Timing timing = new Timing();
@@ -86,6 +88,26 @@ public class BaseTestTreeCache extends BaseClassForTests
     }
 
     /**
+     * Construct a CuratorCache that records exceptions and automatically listens using the bridge.
+     */
+    protected CuratorCache newCacheWithListeners(CuratorFramework client, String path)
+    {
+        CuratorCache result = CuratorCacheBuilder.builder(client, path).build();
+        ListenerBridge.wrap(client, result.getListenable(), eventListener).add();
+        return result;
+    }
+
+    /**
+     * Finish constructing a CuratorCache that records exceptions and automatically listens.
+     */
+    protected CuratorCache buildCacheWithListeners(CuratorCacheBuilder builder)
+    {
+        CuratorCache result = builder.build();
+        ListenerBridge.wrap(client, result.getListenable(), eventListener).add();
+        return result;
+    }
+
+    /**
      * Finish constructing a TreeCache that records exceptions and automatically listens.
      */
     protected TreeCache buildWithListeners(TreeCache.Builder builder)

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index ebaf43e..ee5e918 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -33,7 +33,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class TestTreeCache extends BaseTestTreeCache
+public class TestTreeCache extends BaseTestTreeCache<TreeCache>
 {
     @Test
     public void testSelector() throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
new file mode 100644
index 0000000..049daa5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
+import org.apache.curator.test.compatibility.KillSession2;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
+{
+    @Test
+    public void testSelector() throws Exception
+    {
+        client.create().forPath("/root");
+        client.create().forPath("/root/n1-a");
+        client.create().forPath("/root/n1-b");
+        client.create().forPath("/root/n1-b/n2-a");
+        client.create().forPath("/root/n1-b/n2-b");
+        client.create().forPath("/root/n1-b/n2-b/n3-a");
+        client.create().forPath("/root/n1-c");
+        client.create().forPath("/root/n1-d");
+
+        TreeCacheSelector selector = new TreeCacheSelector()
+        {
+            @Override
+            public boolean traverseChildren(String fullPath)
+            {
+                return !fullPath.equals("/root/n1-b/n2-b");
+            }
+
+            @Override
+            public boolean acceptChild(String fullPath)
+            {
+                return !fullPath.equals("/root/n1-c");
+            }
+        };
+        cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector)));
+        cache.start();
+
+        assertEvent(Type.NODE_ADDED, "/root");
+        assertEvent(Type.NODE_ADDED, "/root/n1-a");
+        assertEvent(Type.NODE_ADDED, "/root/n1-b");
+        assertEvent(Type.NODE_ADDED, "/root/n1-d");
+        assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-a");
+        assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-b");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testStartup() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/2/sub", "two-sub".getBytes());
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+        Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub"));
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testStartEmpty() throws Exception
+    {
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testStartEmptyDeeper() throws Exception
+    {
+        cache = newCacheWithListeners(client, "/test/foo/bar");
+        cache.start();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().creatingParentsIfNeeded().forPath("/test/foo");
+        assertNoMoreEvents();
+        client.create().forPath("/test/foo/bar");
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDepth0() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/1"));
+        Assert.assertNull(cache.get("/test/1"));
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testDepth1() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+        Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/1/sub"));
+        Assert.assertNull(cache.get("/test/2/sub"));
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testDepth1Deeper() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo");
+        client.create().forPath("/test/foo/bar");
+        client.create().forPath("/test/foo/bar/1", "one".getBytes());
+        client.create().forPath("/test/foo/bar/2", "two".getBytes());
+        client.create().forPath("/test/foo/bar/3", "three".getBytes());
+        client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar/2", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar/3", "three".getBytes());
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testAsyncInitialPopulation() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testFromRoot() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client, "/");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+    }
+
+    @Test
+    public void testFromRootWithDepth() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/one"));
+        Assert.assertNull(cache.get("/test/one"));
+    }
+
+    @Test
+    public void testWithNamespace() throws Exception
+    {
+        client.create().forPath("/outer");
+        client.create().forPath("/outer/foo");
+        client.create().forPath("/outer/test");
+        client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client.usingNamespace("outer"), "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+    }
+
+    @Test
+    public void testWithNamespaceAtRoot() throws Exception
+    {
+        client.create().forPath("/outer");
+        client.create().forPath("/outer/foo");
+        client.create().forPath("/outer/test");
+        client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client.usingNamespace("outer"), "/");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/");
+        assertEvent(Type.NODE_ADDED, "/foo");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+        Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test"));
+        Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+    }
+
+    @Test
+    public void testSyncInitialPopulation() throws Exception
+    {
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testChildrenInitialized() throws Exception
+    {
+        client.create().forPath("/test", "".getBytes());
+        client.create().forPath("/test/1", "1".getBytes());
+        client.create().forPath("/test/2", "2".getBytes());
+        client.create().forPath("/test/3", "3".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/1");
+        assertEvent(Type.NODE_ADDED, "/test/2");
+        assertEvent(Type.NODE_ADDED, "/test/3");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly()));
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test/foo", "first".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        client.setData().forPath("/test/foo", "something new".getBytes());
+        assertEvent(Type.NODE_UPDATED, "/test/foo");
+        assertNoMoreEvents();
+
+        Assert.assertNotNull(cache.get("/test/foo"));
+        // No byte data querying the tree because we're not caching data.
+        Assert.assertEquals(cache.get("/test/foo").getData().length, 0);
+    }
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo", "one".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+        assertEvent(Type.INITIALIZED);
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo", "one".getBytes());
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo", "two".getBytes());
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDeleteThenCreateRoot() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo", "one".getBytes());
+
+        cache = newCacheWithListeners(client, "/test/foo");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+        assertEvent(Type.INITIALIZED);
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo");
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo");
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test/foo", "foo".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/me");
+
+        KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+        assertEvent(Type.CONNECTION_LOST);
+        assertEvent(Type.CONNECTION_RECONNECTED);
+        assertEvent(Type.INITIALIZED);
+        assertEvent(Type.NODE_REMOVED, "/test/me", "data".getBytes());
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/t"));
+        Assert.assertNull(cache.get("/testing"));
+
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/o"));
+        Assert.assertNull(cache.get("/test/onely"));
+
+        client.setData().forPath("/test/one", "sup!".getBytes());
+        assertEvent(Type.NODE_UPDATED, "/test/one");
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+
+        client.delete().forPath("/test/one");
+        assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+
+        cache.close();
+        assertNoMoreEvents();
+
+        client.delete().forPath("/test/one");
+        assertNoMoreEvents();
+    }
+
+    /**
+     * Make sure TreeCache gets to a sane state when we can't initially connect to server.
+     */
+    @Test
+    public void testServerNotStartedYet() throws Exception
+    {
+        // Stop the existing server.
+        server.stop();
+
+        // Shutdown the existing client and re-create it started.
+        client.close();
+        initCuratorFramework();
+
+        // Start the client disconnected.
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertNoMoreEvents();
+
+        // Now restart the server.
+        server.restart();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test");
+
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertNoMoreEvents();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
new file mode 100644
index 0000000..f304c24
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.utils.ZKPaths;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCache>
+{
+    /**
+     * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}
+     */
+    private static final class TestNode
+    {
+        String fullPath;
+        byte[] data;
+        Map<String, TestNode> children = new HashMap<String, TestNode>();
+
+        TestNode(String fullPath, byte[] data)
+        {
+            this.fullPath = fullPath;
+            this.data = data;
+        }
+    }
+
+    // These constants will produce a tree about 10 levels deep.
+    private static final int ITERATIONS = 1000;
+    private static final double DIVE_CHANCE = 0.9;
+    private static final int TEST_DEPTH = 5;
+
+    private final Random random = new Random();
+    private boolean withDepth = false;
+
+    /**
+     * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use
+     * a TreeCache to follow the changes.  At each step, assert that TreeCache matches our
+     * source-of-truth test data, and that we see exactly the set of events we expect to see.
+     */
+
+    @Test
+    public void testGiantRandomDeepTree() throws Exception {
+        client.create().forPath("/tree", null);
+        CuratorFramework cl = client.usingNamespace("tree");
+        cache = newCacheWithListeners(cl, "/");
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        TestNode root = new TestNode("/", new byte[0]);
+        int maxDepth = 0;
+        int adds = 0;
+        int removals = 0;
+        int updates = 0;
+
+        for ( int i = 0; i < ITERATIONS; ++i )
+        {
+            // Select a node to update, randomly navigate down through the tree
+            int depth = 0;
+            TestNode last = null;
+            TestNode node = root;
+            while ( !node.children.isEmpty() && random.nextDouble() < DIVE_CHANCE )
+            {
+                // Go down a level in the tree.  Select a random child for the next iteration.
+                last = node;
+                node = Iterables.get(node.children.values(), random.nextInt(node.children.size()));
+                ++depth;
+            }
+            maxDepth = Math.max(depth, maxDepth);
+
+            // Okay we found a node, let's do something interesting with it.
+            switch ( random.nextInt(3) )
+            {
+            case 0:
+                // Try a removal if we have no children and we're not the root node.
+                if ( node != root && node.children.isEmpty() )
+                {
+                    // Delete myself from parent.
+                    TestNode removed = last.children.remove(ZKPaths.getNodeFromPath(node.fullPath));
+                    Assert.assertSame(node, removed);
+
+                    // Delete from ZK
+                    cl.delete().forPath(node.fullPath);
+
+                    // TreeCache should see the delete.
+                    if (shouldSeeEventAt(node.fullPath))
+                    {
+                        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, node.fullPath);
+                    }
+                    ++removals;
+                }
+                break;
+            case 1:
+                // Do an update.
+                byte[] newData = new byte[10];
+                random.nextBytes(newData);
+
+                if ( Arrays.equals(node.data, newData) )
+                {
+                    // Randomly generated the same data! Very small chance, just skip.
+                    continue;
+                }
+
+                // Update source-of-truth.
+                node.data = newData;
+
+                // Update in ZK.
+                cl.setData().forPath(node.fullPath, node.data);
+
+                // TreeCache should see the update.
+                if (shouldSeeEventAt(node.fullPath))
+                {
+                    assertEvent(TreeCacheEvent.Type.NODE_UPDATED, node.fullPath, node.data);
+                }
+
+                ++updates;
+                break;
+            case 2:
+                // Add a new child.
+                String name = Long.toHexString(random.nextLong());
+                if ( node.children.containsKey(name) )
+                {
+                    // Randomly generated the same name! Very small chance, just skip.
+                    continue;
+                }
+
+                // Add a new child to our test tree.
+                byte[] data = new byte[10];
+                random.nextBytes(data);
+                TestNode child = new TestNode(ZKPaths.makePath(node.fullPath, name), data);
+                node.children.put(name, child);
+
+                // Add to ZK.
+                cl.create().forPath(child.fullPath, child.data);
+
+                // TreeCache should see the add.
+                if (shouldSeeEventAt(child.fullPath))
+                {
+                    assertEvent(TreeCacheEvent.Type.NODE_ADDED, child.fullPath, child.data);
+                }
+
+                ++adds;
+                break;
+            }
+
+            // Each iteration, ensure the cached state matches our source-of-truth tree.
+            assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root);
+            assertTreeEquals(cache, root, 0);
+        }
+
+        // Typical stats for this test: maxDepth: 10, adds: 349, removals: 198, updates: 320
+        // We get more adds than removals because removals only happen if we're at a leaf.
+        System.out.println(String.format("maxDepth: %s, adds: %s, removals: %s, updates: %s", maxDepth, adds, removals, updates));
+        assertNoMoreEvents();
+    }
+
+    /**
+     * Returns true we should see an event at this path based on maxDepth, false otherwise.
+     */
+    private boolean shouldSeeEventAt(String fullPath)
+    {
+        return !withDepth || ZKPaths.split(fullPath).size() <= TEST_DEPTH;
+    }
+
+    /**
+     * Recursively assert that current children equal expected children.
+     */
+    private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth)
+    {
+        String path = expectedNode.fullPath;
+        Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path);
+        Assert.assertNotNull(cacheChildren, path);
+
+        if (withDepth && depth == TEST_DEPTH) {
+            return;
+        }
+
+        Assert.assertEquals(cacheChildren.keySet(), expectedNode.children.keySet(), path);
+
+        for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() )
+        {
+            String nodeName = entry.getKey();
+            CachedNode childData = cacheChildren.get(nodeName);
+            TestNode expectedChild = entry.getValue();
+            assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild);
+            assertTreeEquals(cache, expectedChild, depth + 1);
+        }
+    }
+
+    /**
+     * Assert that the given node data matches expected test node data.
+     */
+    private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode)
+    {
+        String path = expectedNode.fullPath;
+        Assert.assertNotNull(actualChild, path);
+        Assert.assertEquals(actualChild.getData(), expectedNode.data, path);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
index 96ce75c..1a9e366 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-public class TestTreeCacheRandomTree extends BaseTestTreeCache
+public class TestTreeCacheRandomTree extends BaseTestTreeCache<TreeCache>
 {
     /**
      * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}

http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
index e4c3b7e..e884b8c 100644
--- a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
+++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
@@ -27,16 +27,19 @@ public class WatchersDebug
     private static final Method getDataWatches;
     private static final Method getExistWatches;
     private static final Method getChildWatches;
+    private static final Method getPersistentWatches;
     static
     {
         Method localGetDataWatches = null;
         Method localGetExistWatches = null;
         Method localGetChildWatches = null;
+        Method localGetPersistentWatches = null;
         try
         {
             localGetDataWatches = getMethod("getDataWatches");
             localGetExistWatches = getMethod("getExistWatches");
             localGetChildWatches = getMethod("getChildWatches");
+            localGetPersistentWatches = getMethod("getPersistentWatches");
         }
         catch ( NoSuchMethodException e )
         {
@@ -45,6 +48,7 @@ public class WatchersDebug
         getDataWatches = localGetDataWatches;
         getExistWatches = localGetExistWatches;
         getChildWatches = localGetChildWatches;
+        getPersistentWatches = localGetPersistentWatches;
     }
 
     public static List<String> getDataWatches(ZooKeeper zooKeeper)
@@ -62,6 +66,11 @@ public class WatchersDebug
         return callMethod(zooKeeper, getChildWatches);
     }
 
+    public static List<String> getPersistentWatches(ZooKeeper zooKeeper)
+    {
+        return callMethod(zooKeeper, getPersistentWatches);
+    }
+
     private WatchersDebug()
     {
     }


[7/8] curator git commit: 1. Updates from latest ZK changes. 2. Added async version for creating persistent watch

Posted by ra...@apache.org.
1. Updates from latest ZK changes. 2. Added async version for creating persistent watch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a27f8768
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a27f8768
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a27f8768

Branch: refs/heads/persistent-watch
Commit: a27f87688c4a3a655d3970cbb5166c6286b5e147
Parents: 9c1186b
Author: randgalt <ra...@apache.org>
Authored: Wed Oct 4 16:01:42 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Oct 4 16:01:42 2017 +0200

----------------------------------------------------------------------
 .../async/api/AsyncPersistentWatchBuilder.java  | 33 +++++++++
 .../AsyncPersistentWatchBuilderImpl.java        | 75 ++++++++++++++++++++
 2 files changed, 108 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/a27f8768/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
new file mode 100644
index 0000000..b794d88
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.api;
+
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.x.async.AsyncStage;
+
+public interface AsyncPersistentWatchBuilder extends AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>
+{
+    /**
+     * ZooKeeper persistent watches can optionally be recursive. See
+     * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+     *
+     * @return this
+     */
+    AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/a27f8768/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..4f9fd87
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.Watching;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
+import org.apache.zookeeper.Watcher;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+{
+    private final CuratorFrameworkImpl client;
+    private final Filters filters;
+    private Watching watching = null;
+    private boolean recursive = false;
+
+    AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+    {
+        this.client = client;
+        this.filters = filters;
+    }
+
+    @Override
+    public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
+    {
+        recursive = true;
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncStage<Void> forPath(String path)
+    {
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+        AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
+        return safeCall(common.internalCallback, () -> builder.forPath(path));
+    }
+}


[2/8] curator git commit: licenses

Posted by ra...@apache.org.
licenses


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bf7c5ecf
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bf7c5ecf
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bf7c5ecf

Branch: refs/heads/persistent-watch
Commit: bf7c5ecf548e18ead3e5bd9b0be05df42367035a
Parents: a83e3e0
Author: randgalt <ra...@apache.org>
Authored: Wed Aug 23 08:18:51 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Aug 23 08:18:51 2017 +0200

----------------------------------------------------------------------
 .../framework/recipes/cache/ListenerBridge.java   | 18 ++++++++++++++++++
 .../framework/recipes/cache/SelectorBridge.java   | 18 ++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bf7c5ecf/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
index 8a2d665..29e7b6c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/curator/blob/bf7c5ecf/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
index 0c6af08..815345c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.recipes.watch.CacheAction;


[8/8] curator git commit: 1. Updates from latest ZK changes. 2. Added async version for creating persistent watch

Posted by ra...@apache.org.
1. Updates from latest ZK changes. 2. Added async version for creating persistent watch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d7bf1a24
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d7bf1a24
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d7bf1a24

Branch: refs/heads/persistent-watch
Commit: d7bf1a2461ecfa0bf708b3890dc4b3a019cacdb0
Parents: a27f876
Author: randgalt <ra...@apache.org>
Authored: Wed Oct 4 16:01:51 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Oct 4 16:01:51 2017 +0200

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java       |  5 +++++
 .../imps/AddPersistentWatchBuilderImpl.java       | 10 +++++++++-
 .../apache/curator/framework/imps/Watching.java   | 10 +++++-----
 .../framework/recipes/cache/TreeCacheBridge.java  | 18 ++++++++++++++++++
 .../recipes/cache/TreeCacheBridgeImpl.java        | 18 ++++++++++++++++++
 .../framework/recipes/watch/CacheSelectors.java   |  6 ++----
 .../framework/recipes/watch/CachedNode.java       | 18 ++++++++++++++++++
 .../x/async/api/AsyncCuratorFrameworkDsl.java     |  7 +++++++
 .../async/details/AsyncCuratorFrameworkImpl.java  |  6 ++++++
 .../curator/x/async/TestBasicOperations.java      | 14 ++++++++++++++
 10 files changed, 102 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index ce31d08..f075daa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -193,6 +193,11 @@ public interface CuratorFramework extends Closeable
      */
     public SyncBuilder sync();
 
+    /**
+     * Start a persistent watch builder
+     *
+     * @return builder object
+     */
     public AddPersistentWatchBuilder addPersistentWatch();
 
     /**

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
index 56f8f79..4f51f39 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
-class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
 {
     private final CuratorFrameworkImpl client;
     private Watching watching = null;
@@ -45,6 +45,14 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
         this.client = client;
     }
 
+    public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.recursive = recursive;
+    }
+
     @Override
     public AddPersistentWatchable<Pathable<Void>> inBackground()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
index 8b6f37a..4a0eed9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.listen.Listenable;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
index 0198aa4..35fcac4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.util.concurrent.MoreExecutors;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
index 8814e57..ed6c6fa 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
@@ -21,9 +21,8 @@ package org.apache.curator.framework.recipes.watch;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.server.PathIterator;
+import org.apache.zookeeper.server.PathParentIterator;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -315,10 +314,9 @@ public class CacheSelectors
 
             private CacheSelector getSelector(String fullPath)
             {
-                String parent = ZKPaths.getPathAndNode(fullPath).getPath();
                 for ( CompositeEntry entry : entries )
                 {
-                    PathIterator pathIterator = new PathIterator(fullPath);
+                    PathParentIterator pathIterator = PathParentIterator.forAll(fullPath);
                     while ( pathIterator.hasNext() )
                     {
                         if ( pathIterator.next().equals(entry.path) )

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
index b07993f..18131cf 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.zookeeper.data.Stat;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..c1748d0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
     AsyncReconfigBuilder reconfig();
 
     /**
+     * Start a persistent watch builder
+     *
+     * @return builder object
+     */
+    AsyncPersistentWatchBuilder addPersistentWatch();
+
+    /**
      * Start a transaction builder
      *
      * @return builder object

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..afa1de0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncPersistentWatchBuilderImpl addPersistentWatch()
+    {
+        return new AsyncPersistentWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public AsyncMultiTransaction transaction()
     {
         return operations -> {

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index f814146..aed1385 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -32,8 +32,10 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static java.util.EnumSet.of;
 import static org.apache.curator.x.async.api.CreateOption.compress;
@@ -199,4 +201,16 @@ public class TestBasicOperations extends CompletableBaseClassForTests
         complete(client.getData().storingStatIn(stat).forPath("/test"));
         Assert.assertEquals(stat.getDataLength(), "hey".length());
     }
+
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<>();
+        Watcher watcher = event -> events.add(event.getType());
+        complete(client.addPersistentWatch().recursive().usingWatcher(watcher).forPath("/a/b"));
+        client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/c");
+        client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/d");
+        Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated);
+        Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated);
+    }
 }


[5/8] curator git commit: Abstracted the TreeCache public API and then an alternate implementation that uses the new CuratorCache instead of TreeCache. This should make porting older code much easier

Posted by ra...@apache.org.
Abstracted the TreeCache public API and then an alternate implementation that uses the new CuratorCache instead of TreeCache. This should make porting older code much easier


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/02073a71
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/02073a71
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/02073a71

Branch: refs/heads/persistent-watch
Commit: 02073a71a3a165babba9a7db84449ef9e7439a19
Parents: 570023d
Author: randgalt <ra...@apache.org>
Authored: Sun Aug 27 19:03:28 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun Aug 27 19:03:28 2017 +0200

----------------------------------------------------------------------
 .../framework/recipes/cache/ListenerBridge.java |  26 +-
 .../framework/recipes/cache/TreeCache.java      |  88 ++--
 .../recipes/cache/TreeCacheBridge.java          |  49 ++
 .../recipes/cache/TreeCacheBridgeImpl.java      |  78 +++
 .../framework/recipes/watch/CacheSelectors.java |   1 +
 .../framework/recipes/watch/CachedNode.java     | 104 +---
 .../framework/recipes/watch/CachedNodeImpl.java |  90 ++++
 .../framework/recipes/watch/CachedNodeMap.java  |   2 +-
 .../framework/recipes/watch/CuratorCache.java   |   4 +-
 .../recipes/watch/CuratorCacheBase.java         |  10 +-
 .../recipes/watch/CuratorCacheBuilder.java      |  20 +-
 .../recipes/watch/InternalCuratorCache.java     |  23 +-
 .../recipes/watch/InternalNodeCache.java        |  10 +-
 .../recipes/cache/BaseTestTreeCache.java        |  20 +
 .../recipes/cache/TestTreeCacheBridge.java      | 199 +++++---
 .../cache/TestTreeCacheBridgeRandomTree.java    |  42 +-
 .../cache/TestTreeCacheBridgeWrapper.java       | 500 +++++++++++++++++++
 .../TestTreeCacheBridgeWrapperRandomTree.java   | 224 +++++++++
 18 files changed, 1242 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
index 29e7b6c..693e1da 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
@@ -26,8 +26,12 @@ import org.apache.curator.framework.recipes.watch.CacheListener;
 import org.apache.curator.framework.recipes.watch.CachedNode;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -186,7 +190,7 @@ public class ListenerBridge implements CacheListener, ConnectionStateListener
      */
     public static ChildData toData(String path, CachedNode affectedNode)
     {
-        if ( (path != null) && (affectedNode != null) && (affectedNode.getData() != null) )
+        if ( (path != null) && (affectedNode != null) )
         {
             return new ChildData(path, affectedNode.getStat(), affectedNode.getData());
         }
@@ -208,6 +212,26 @@ public class ListenerBridge implements CacheListener, ConnectionStateListener
         return new TreeCacheEvent(type, data);
     }
 
+    public static Map<String, ChildData> toData(String basePath, Map<String, CachedNode> from)
+    {
+        if ( from.isEmpty() )
+        {
+            return Collections.emptyMap();
+        }
+
+        Map<String, ChildData> mapped = new HashMap<>();
+        for ( Map.Entry<String, CachedNode> entry : from.entrySet() )
+        {
+            String path = entry.getKey();
+            ChildData childData = toData(ZKPaths.makePath(basePath, path), entry.getValue());
+            if ( childData != null )
+            {
+                mapped.put(path, childData);
+            }
+        }
+        return mapped;
+    }
+
     protected void handleException(Exception e)
     {
         log.error("Unhandled exception in listener", e);

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 94e6774..4e663cd 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -31,7 +31,11 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CacheAction;
+import org.apache.curator.framework.recipes.watch.CacheSelector;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
 import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -43,7 +47,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -73,7 +76,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
  * @deprecated use {@link CuratorCache}
  */
 @Deprecated
-public class TreeCache implements Closeable
+public class TreeCache implements TreeCacheBridge
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
     private final boolean createParentNodes;
@@ -109,6 +112,53 @@ public class TreeCache implements Closeable
             return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, selector);
         }
 
+        public TreeCacheBridge buildBridge()
+        {
+            CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, path);
+            CacheAction cacheAction;
+            if ( cacheData )
+            {
+                cacheAction = dataIsCompressed ? CacheAction.STAT_AND_UNCOMPRESSED_DATA : CacheAction.STAT_AND_DATA;
+            }
+            else
+            {
+                cacheAction = dataIsCompressed ? CacheAction.UNCOMPRESSED_STAT_ONLY : CacheAction.STAT_ONLY;
+            }
+            final CacheSelector maxDepthSelector = (maxDepth != Integer.MAX_VALUE) ? CacheSelectors.maxDepth(maxDepth) : null;
+            final CacheAction sealedCacheAction = cacheAction;
+            CacheSelector cacheSelector = new CacheSelector()
+            {
+                @Override
+                public boolean traverseChildren(String basePath, String fullPath)
+                {
+                    //noinspection SimplifiableIfStatement
+                    if ( (maxDepthSelector != null) && !maxDepthSelector.traverseChildren(basePath, fullPath) )
+                    {
+                        return false;
+                    }
+                    return selector.traverseChildren(fullPath);
+                }
+
+                @Override
+                public CacheAction actionForPath(String basePath, String fullPath)
+                {
+                    if ( (maxDepthSelector != null) && !maxDepthSelector.traverseChildren(basePath, fullPath) )
+                    {
+                        return CacheAction.NOT_STORED;
+                    }
+                    return selector.acceptChild(fullPath) ? sealedCacheAction : CacheAction.NOT_STORED;
+                }
+            };
+            builder = builder.withCacheSelector(cacheSelector).withDefaultData(null);
+
+            if ( createParentNodes )
+            {
+                LOG.warn("setCreateParentNodes(true) is not supported by TreeCacheBridge. Use EnsureContainers or MigrationManager instead");
+            }
+
+            return new TreeCacheBridgeImpl(client, builder.build());
+        }
+
         /**
          * Sets whether or not to cache byte data per node; default {@code true}.
          */
@@ -579,12 +629,7 @@ public class TreeCache implements Closeable
         this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
     }
 
-    /**
-     * Start the cache. The cache is not started automatically. You must call this method.
-     *
-     * @return this
-     * @throws Exception errors
-     */
+    @Override
     public TreeCache start() throws Exception
     {
         Preconditions.checkState(treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED), "already started");
@@ -600,9 +645,6 @@ public class TreeCache implements Closeable
         return this;
     }
 
-    /**
-     * Close/end the cache.
-     */
     @Override
     public void close()
     {
@@ -624,11 +666,7 @@ public class TreeCache implements Closeable
         }
     }
 
-    /**
-     * Return the cache listenable
-     *
-     * @return listenable
-     */
+    @Override
     public Listenable<TreeCacheListener> getListenable()
     {
         return listeners;
@@ -680,14 +718,7 @@ public class TreeCache implements Closeable
         return current;
     }
 
-    /**
-     * Return the current set of children at the given path, mapped by child name. There are no
-     * guarantees of accuracy; this is merely the most recent view of the data.  If there is no
-     * node at this path, {@code null} is returned.
-     *
-     * @param fullPath full path to the node to check
-     * @return a possibly-empty list of children if the node is alive, or null
-     */
+    @Override
     public Map<String, ChildData> getCurrentChildren(String fullPath)
     {
         TreeNode node = find(fullPath);
@@ -721,14 +752,7 @@ public class TreeCache implements Closeable
         return node.nodeState == NodeState.LIVE ? result : null;
     }
 
-    /**
-     * Return the current data for the given path. There are no guarantees of accuracy. This is
-     * merely the most recent view of the data. If there is no node at the given path,
-     * {@code null} is returned.
-     *
-     * @param fullPath full path to the node to check
-     * @return data if the node is alive, or null
-     */
+    @Override
     public ChildData getCurrentData(String fullPath)
     {
         TreeNode node = find(fullPath);

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
new file mode 100644
index 0000000..9c383af
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -0,0 +1,49 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Map;
+
+public interface TreeCacheBridge extends Closeable
+{
+    /**
+     * Start the cache. The cache is not started automatically. You must call this method.
+     *
+     * @return this
+     * @throws Exception errors
+     */
+    TreeCacheBridge start() throws Exception;
+
+    /**
+     * Close/end the cache.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the cache listenable
+     *
+     * @return listenable
+     */
+    Listenable<TreeCacheListener> getListenable();
+
+    /**
+     * Return the current set of children at the given path, mapped by child name. There are no
+     * guarantees of accuracy; this is merely the most recent view of the data.  If there is no
+     * node at this path, {@code null} is returned.
+     *
+     * @param fullPath full path to the node to check
+     * @return a possibly-empty list of children if the node is alive, or null
+     */
+    Map<String, ChildData> getCurrentChildren(String fullPath);
+
+    /**
+     * Return the current data for the given path. There are no guarantees of accuracy. This is
+     * merely the most recent view of the data. If there is no node at the given path,
+     * {@code null} is returned.
+     *
+     * @param fullPath full path to the node to check
+     * @return data if the node is alive, or null
+     */
+    ChildData getCurrentData(String fullPath);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
new file mode 100644
index 0000000..0198aa4
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
@@ -0,0 +1,78 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+public class TreeCacheBridgeImpl implements TreeCacheBridge, Listenable<TreeCacheListener>
+{
+    private final CuratorFramework client;
+    private final CuratorCache cache;
+    private final Map<TreeCacheListener, ListenerBridge> listenerMap = new ConcurrentHashMap<>();
+
+    public TreeCacheBridgeImpl(CuratorFramework client, CuratorCache cache)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.cache = Objects.requireNonNull(cache, "cache cannot be null");
+    }
+
+    @Override
+    public TreeCacheBridge start()
+    {
+        cache.start();
+        return this;
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<TreeCacheListener> getListenable()
+    {
+        return this;
+    }
+
+    @Override
+    public Map<String, ChildData> getCurrentChildren(String fullPath)
+    {
+        return ListenerBridge.toData(fullPath, cache.childrenAtPath(fullPath));
+    }
+
+    @Override
+    public ChildData getCurrentData(String fullPath)
+    {
+        return ListenerBridge.toData(fullPath, cache.get(fullPath));
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        addListener(listener, MoreExecutors.directExecutor());
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener, Executor executor)
+    {
+        ListenerBridge listenerBridge = ListenerBridge.wrap(client, cache.getListenable(), listener);
+        listenerBridge.add();
+        listenerMap.put(listener, listenerBridge);
+    }
+
+    @Override
+    public void removeListener(TreeCacheListener listener)
+    {
+        ListenerBridge listenerBridge = listenerMap.remove(listener);
+        if ( listenerBridge != null )
+        {
+            listenerBridge.remove();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
index eaf1145..8814e57 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
@@ -23,6 +23,7 @@ import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.server.PathIterator;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
index f3cd18a..b07993f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
@@ -1,108 +1,10 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.zookeeper.data.Stat;
-import java.util.Arrays;
-import java.util.Objects;
 
-/**
- * Represents the data for a cached node
- */
-public class CachedNode
+public interface CachedNode
 {
-    private final Stat stat;
-    private final byte[] data;
+    Stat getStat();
 
-    private static final byte[] defaultData = new byte[0];
-
-    /**
-     * Creates an empty node
-     */
-    public CachedNode()
-    {
-        this(new Stat(), defaultData);
-    }
-
-    /**
-     * A node with a stat but empty data
-     *
-     * @param stat the stat
-     */
-    public CachedNode(Stat stat)
-    {
-        this(stat, defaultData);
-    }
-
-    /**
-     * @param stat the stat
-     * @param data uncompressed data. If <code>null</code> an empty array is substituted.
-     */
-    public CachedNode(Stat stat, byte[] data)
-    {
-        this.stat = Objects.requireNonNull(stat, "stat cannot be null");
-        this.data = (data != null) ? data : defaultData;
-    }
-
-    public Stat getStat()
-    {
-        return stat;
-    }
-
-    public byte[] getData()
-    {
-        return data;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if ( this == o )
-        {
-            return true;
-        }
-        if ( o == null || getClass() != o.getClass() )
-        {
-            return false;
-        }
-
-        CachedNode that = (CachedNode)o;
-
-        //noinspection SimplifiableIfStatement
-        if ( !stat.equals(that.stat) )
-        {
-            return false;
-        }
-        return Arrays.equals(data, that.data);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = stat.hashCode();
-        result = 31 * result + Arrays.hashCode(data);
-        return result;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "CachedNode{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}';
-    }
+    byte[] getData();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java
new file mode 100644
index 0000000..91b6a5e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.data.Stat;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Represents the data for a cached node
+ */
+public class CachedNodeImpl implements CachedNode
+{
+    private final Stat stat;
+    private final byte[] data;
+
+    /**
+     * @param stat the stat
+     * @param data uncompressed data or null - NOTE: ownership is taken of the given data object
+     */
+    public CachedNodeImpl(Stat stat, byte[] data)
+    {
+        this.stat = Objects.requireNonNull(stat, "stat cannot be null");
+        this.data = data;
+    }
+
+    @Override
+    public Stat getStat()
+    {
+        return stat;
+    }
+
+    @Override
+    public byte[] getData()
+    {
+        return data;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( this == o )
+        {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() )
+        {
+            return false;
+        }
+
+        CachedNodeImpl that = (CachedNodeImpl)o;
+
+        //noinspection SimplifiableIfStatement
+        if ( !stat.equals(that.stat) )
+        {
+            return false;
+        }
+        return Arrays.equals(data, that.data);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = stat.hashCode();
+        result = 31 * result + Arrays.hashCode(data);
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CachedNodeImpl{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
index d59ffae..adcd458 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
@@ -22,7 +22,7 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * Interface for a container of path to {@link org.apache.curator.framework.recipes.watch.CachedNode}
+ * Interface for a container of path to {@link CachedNodeImpl}
  */
 public interface CachedNodeMap
 {

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index e97157e..2742000 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -150,7 +150,7 @@ public interface CuratorCache extends Closeable
 
     /**
      * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
      *
      * @param path the path of the node to clear
      */
@@ -158,7 +158,7 @@ public interface CuratorCache extends Closeable
 
     /**
      * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
      *
      * @param path  the path of the node to clear
      * @param ifVersion if non-negative, only clear the data if the data's version matches this version

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 1aa5b5d..c072f7c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
 abstract class CuratorCacheBase implements CuratorCache
 {
     protected final CachedNodeMap cache;
+    protected final byte[] defaultData;
     private final String mainPath;
     private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
@@ -54,11 +55,12 @@ abstract class CuratorCacheBase implements CuratorCache
         CLOSED
     }
 
-    protected CuratorCacheBase(String mainPath, CachedNodeMap cache, boolean sendRefreshEvents)
+    protected CuratorCacheBase(String mainPath, CachedNodeMap cache, boolean sendRefreshEvents, byte[] defaultData)
     {
         this.mainPath = Objects.requireNonNull(mainPath, "mainPath cannot be null");
         this.cache = Objects.requireNonNull(cache, "cache cannot be null");
         this.sendRefreshEvents = sendRefreshEvents;
+        this.defaultData = defaultData;
     }
 
     @Override
@@ -140,7 +142,7 @@ abstract class CuratorCacheBase implements CuratorCache
 
     /**
      * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
      *
      * @param path the path of the node to clear
      */
@@ -152,7 +154,7 @@ abstract class CuratorCacheBase implements CuratorCache
 
     /**
      * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
      *
      * @param path  the path of the node to clear
      * @param ifVersion if non-negative, only clear the data if the data's version matches this version
@@ -166,7 +168,7 @@ abstract class CuratorCacheBase implements CuratorCache
         {
             if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) )
             {
-                return cache.replace(path, data, new CachedNode(data.getStat()));
+                return cache.replace(path, data, new CachedNodeImpl(data.getStat(), defaultData));
             }
         }
         return false;

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index 10f8bc2..eb8d0df 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.watch;
 
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -40,6 +41,7 @@ public class CuratorCacheBuilder
     private boolean usingSoftValues = false;
     private Long expiresAfterWriteMs = null;
     private Long expiresAfterAccessMs = null;
+    private byte[] defaultData = new byte[0];
 
     /**
      * Start a new builder for the given client and main path
@@ -66,10 +68,10 @@ public class CuratorCacheBuilder
         {
             Preconditions.checkState(cacheSelector == null, "Single node mode does not support CacheSelectors");
             Preconditions.checkState(singleNodeCacheAction != CacheAction.UNCOMPRESSED_STAT_ONLY, "Single node mode does not support UNCOMPRESSED_STAT_ONLY");
-            return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart);
+            return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart, defaultData);
         }
 
-        return new InternalCuratorCache(client, path, cacheSelector, cachedNodeMap, sendRefreshEvents, refreshOnStart, sortChildren);
+        return new InternalCuratorCache(client, path, cacheSelector, cachedNodeMap, sendRefreshEvents, refreshOnStart, sortChildren, defaultData);
     }
 
     /**
@@ -215,6 +217,20 @@ public class CuratorCacheBuilder
         return this;
     }
 
+    /**
+     * If a node does not contain data (due to the CacheSelector or other reason), the default is
+     * to set {@link CachedNodeImpl#getData()} to an empty byte array (<code>byte[0]</code>). Use this
+     * method to change to another value or pass <code>null</code>
+     *
+     * @param data data bytes or null
+     * @return this
+     */
+    public CuratorCacheBuilder withDefaultData(byte[] data)
+    {
+        this.defaultData = (data != null) ? Arrays.copyOf(data, data.length) : null;
+        return this;
+    }
+
     private CuratorCacheBuilder(CuratorFramework client, String path)
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index edd08b5..4b88bf4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.watch;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -30,6 +29,7 @@ import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collection;
@@ -38,11 +38,10 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Exchanger;
-import java.util.concurrent.atomic.AtomicInteger;
 
 class InternalCuratorCache extends CuratorCacheBase implements Watcher
 {
-    private static final CachedNode nullNode = new CachedNode();
+    private final CachedNode nullNode;
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final PersistentWatcher watcher;
     private final CuratorFramework client;
@@ -64,9 +63,9 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
         }
     };
 
-    InternalCuratorCache(CuratorFramework client, String path, final CacheSelector cacheSelector, CachedNodeMap cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren)
+    InternalCuratorCache(CuratorFramework client, String path, final CacheSelector cacheSelector, CachedNodeMap cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren, byte[] defaultData)
     {
-        super(path, cache, sendRefreshEvents);
+        super(path, cache, sendRefreshEvents, defaultData);
         this.client = Objects.requireNonNull(client, "client cannot be null");
         this.basePath = Objects.requireNonNull(path, "path cannot be null");
         this.cacheSelector = Objects.requireNonNull(cacheSelector, "cacheSelector cannot be null");
@@ -84,6 +83,8 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             }
         };
         watcher.getListenable().addListener(this);
+
+        nullNode = new CachedNodeImpl(new Stat(), defaultData);
     }
 
     @Override
@@ -163,7 +164,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                     if ( event.getType() == CuratorEventType.GET_DATA )
                     {
                         CacheAction cacheAction = (CacheAction)event.getContext();
-                        CachedNode newNode = new CachedNode(event.getStat(), event.getData());
+                        CachedNode newNode = new CachedNodeImpl(event.getStat(), event.getData());
                         CachedNode oldNode = putNewNode(path, cacheAction, newNode);
                         if ( oldNode == null )
                         {
@@ -292,7 +293,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             case STAT_ONLY:
             case UNCOMPRESSED_STAT_ONLY:
             {
-                putNode = new CachedNode(newNode.getStat());
+                putNode = new CachedNodeImpl(newNode.getStat(), defaultData);
                 break;
             }
 
@@ -306,14 +307,6 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
         return cache.put(path, putNode);
     }
 
-    private void decrementOutstanding(SettableFuture<Boolean> task, AtomicInteger outstandingCount)
-    {
-        if ( outstandingCount.decrementAndGet() <= 0 )
-        {
-            task.set(true);
-        }
-    }
-
     private void remove(String path)
     {
         CachedNode removed = cache.remove(path);

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
index dd880f6..5f8d750 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -31,6 +31,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.CountDownLatch;
@@ -44,7 +45,7 @@ class InternalNodeCache extends CuratorCacheBase
     private final String path;
     private final CacheAction cacheAction;
     private final AtomicBoolean isConnected = new AtomicBoolean(true);
-    private static final CachedNode nullNode = new CachedNode();
+    private final CachedNode nullNode;
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -90,12 +91,13 @@ class InternalNodeCache extends CuratorCacheBase
         }
     };
 
-    InternalNodeCache(CuratorFramework client, String path, CacheAction cacheAction, CachedNodeMap cache, boolean sendRefreshEvents, boolean refreshOnStart)
+    InternalNodeCache(CuratorFramework client, String path, CacheAction cacheAction, CachedNodeMap cache, boolean sendRefreshEvents, boolean refreshOnStart, byte[] defaultData)
     {
-        super(path, cache, sendRefreshEvents);
+        super(path, cache, sendRefreshEvents, defaultData);
         this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
         this.cacheAction = cacheAction;
+        nullNode = new CachedNodeImpl(new Stat(), defaultData);
         Preconditions.checkArgument(refreshOnStart, "refreshingWhenStarted() must be true when forSingleNode() is used");
     }
 
@@ -156,7 +158,7 @@ class InternalNodeCache extends CuratorCacheBase
             {
                 if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
-                    CachedNode cachedNode = new CachedNode(event.getStat(), event.getData());
+                    CachedNode cachedNode = new CachedNodeImpl(event.getStat(), event.getData());
                     setNewData(cachedNode);
                 }
                 break;

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index 9cbec98..7f1f547 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -88,6 +88,16 @@ public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests
     }
 
     /**
+     * Construct a TreeCache that records exceptions and automatically listens.
+     */
+    protected TreeCacheBridge newTreeCacheBridgeWithListeners(CuratorFramework client, String path)
+    {
+        TreeCacheBridge result = TreeCache.newBuilder(client, path).buildBridge();
+        result.getListenable().addListener(eventListener);
+        return result;
+    }
+
+    /**
      * Construct a CuratorCache that records exceptions and automatically listens using the bridge.
      */
     protected CuratorCache newCacheWithListeners(CuratorFramework client, String path)
@@ -118,6 +128,16 @@ public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests
         return result;
     }
 
+    /**
+     * Finish constructing a TreeCache that records exceptions and automatically listens.
+     */
+    protected TreeCacheBridge buildBridgeWithListeners(TreeCache.Builder builder)
+    {
+        TreeCacheBridge result = builder.buildBridge();
+        result.getListenable().addListener(eventListener);
+        return result;
+    }
+
     @Override
     @BeforeMethod
     public void setup() throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
index 049daa5..614fa56 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
@@ -20,16 +20,17 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
-import org.apache.curator.framework.recipes.watch.CacheSelectors;
-import org.apache.curator.framework.recipes.watch.CuratorCache;
-import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
 import org.apache.curator.test.compatibility.KillSession2;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.concurrent.Semaphore;
 
-public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
+@SuppressWarnings("deprecation")
+public class TestTreeCacheBridge extends BaseTestTreeCache<TreeCacheBridge>
 {
     @Test
     public void testSelector() throws Exception
@@ -57,7 +58,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
                 return !fullPath.equals("/root/n1-c");
             }
         };
-        cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector)));
+        cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/root").setSelector(selector));
         cache.start();
 
         assertEvent(Type.NODE_ADDED, "/root");
@@ -79,7 +80,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test/3", "three".getBytes());
         client.create().forPath("/test/2/sub", "two-sub".getBytes());
 
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
@@ -89,16 +90,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         assertEvent(Type.INITIALIZED);
         assertNoMoreEvents();
 
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
-        Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
-        Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub"));
-        Assert.assertNull(cache.get("/test/non_exist"));
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+        Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of("sub"));
+        Assert.assertTrue(cache.getCurrentChildren("/test/non_exist").isEmpty());
     }
 
     @Test
     public void testStartEmpty() throws Exception
     {
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.INITIALIZED);
 
@@ -110,7 +111,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
     @Test
     public void testStartEmptyDeeper() throws Exception
     {
-        cache = newCacheWithListeners(client, "/test/foo/bar");
+        cache = newTreeCacheBridgeWithListeners(client, "/test/foo/bar");
         cache.start();
         assertEvent(Type.INITIALIZED);
 
@@ -130,17 +131,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test/3", "three".getBytes());
         client.create().forPath("/test/2/sub", "two-sub".getBytes());
 
-        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0));
-        cache = buildCacheWithListeners(builder);
+        cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(0));
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.INITIALIZED);
         assertNoMoreEvents();
 
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
-        Assert.assertNull(cache.get("/test/1"));
-        Assert.assertNull(cache.get("/test/1"));
-        Assert.assertNull(cache.get("/test/non_exist"));
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.getCurrentData("/test/1"));
+        Assert.assertTrue(cache.getCurrentChildren("/test/1").isEmpty());
+        Assert.assertNull(cache.getCurrentData("/test/non_exist"));
     }
 
     @Test
@@ -152,8 +152,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test/3", "three".getBytes());
         client.create().forPath("/test/2/sub", "two-sub".getBytes());
 
-        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1));
-        cache = buildCacheWithListeners(builder);
+        cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(1));
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
@@ -162,12 +161,12 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         assertEvent(Type.INITIALIZED);
         assertNoMoreEvents();
 
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
-        Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
-        Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of());
-        Assert.assertNull(cache.get("/test/1/sub"));
-        Assert.assertNull(cache.get("/test/2/sub"));
-        Assert.assertNull(cache.get("/test/non_exist"));
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+        Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.getCurrentData("/test/2/sub"));
+        Assert.assertTrue(cache.getCurrentChildren("/test/2/sub").isEmpty());
+        Assert.assertTrue(cache.getCurrentChildren("/test/non_exist").isEmpty());
     }
 
     @Test
@@ -181,8 +180,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test/foo/bar/3", "three".getBytes());
         client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
 
-        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1));
-        cache = buildCacheWithListeners(builder);
+        cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test/foo/bar").setMaxDepth(1));
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test/foo/bar");
         assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes());
@@ -198,7 +196,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test");
         client.create().forPath("/test/one", "hey there".getBytes());
 
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.NODE_ADDED, "/test/one");
@@ -212,7 +210,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test");
         client.create().forPath("/test/one", "hey there".getBytes());
 
-        cache = newCacheWithListeners(client, "/");
+        cache = newTreeCacheBridgeWithListeners(client, "/");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/");
         assertEvent(Type.NODE_ADDED, "/test");
@@ -220,10 +218,10 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         assertEvent(Type.INITIALIZED);
         assertNoMoreEvents();
 
-        Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
-        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
-        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+        Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test"));
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
     }
 
     @Test
@@ -232,18 +230,17 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test");
         client.create().forPath("/test/one", "hey there".getBytes());
 
-        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1));
-        cache = buildCacheWithListeners(builder);
+        cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/").setMaxDepth(1));
         cache.start();
         assertEvent(Type.NODE_ADDED, "/");
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.INITIALIZED);
         assertNoMoreEvents();
 
-        Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
-        Assert.assertNull(cache.get("/test/one"));
-        Assert.assertNull(cache.get("/test/one"));
+        Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test"));
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.getCurrentData("/test/one"));
+        Assert.assertTrue(cache.getCurrentChildren("/test/one").isEmpty());
     }
 
     @Test
@@ -254,16 +251,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/outer/test");
         client.create().forPath("/outer/test/one", "hey there".getBytes());
 
-        cache = newCacheWithListeners(client.usingNamespace("outer"), "/test");
+        cache = newTreeCacheBridgeWithListeners(client.usingNamespace("outer"), "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.NODE_ADDED, "/test/one");
         assertEvent(Type.INITIALIZED);
         assertNoMoreEvents();
 
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
-        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
-        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
     }
 
     @Test
@@ -274,7 +271,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/outer/test");
         client.create().forPath("/outer/test/one", "hey there".getBytes());
 
-        cache = newCacheWithListeners(client.usingNamespace("outer"), "/");
+        cache = newTreeCacheBridgeWithListeners(client.usingNamespace("outer"), "/");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/");
         assertEvent(Type.NODE_ADDED, "/foo");
@@ -282,17 +279,17 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         assertEvent(Type.NODE_ADDED, "/test/one");
         assertEvent(Type.INITIALIZED);
         assertNoMoreEvents();
-        Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test"));
-        Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of());
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
-        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
-        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+        Assert.assertEquals(cache.getCurrentChildren("/").keySet(), ImmutableSet.of("foo", "test"));
+        Assert.assertEquals(cache.getCurrentChildren("/foo").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
     }
 
     @Test
     public void testSyncInitialPopulation() throws Exception
     {
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.INITIALIZED);
 
@@ -311,7 +308,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test/2", "2".getBytes());
         client.create().forPath("/test/3", "3".getBytes());
 
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.NODE_ADDED, "/test/1");
@@ -326,7 +323,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
     {
         client.create().forPath("/test");
 
-        cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly()));
+        cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setCacheData(false));
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.INITIALIZED);
@@ -338,9 +335,9 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         assertEvent(Type.NODE_UPDATED, "/test/foo");
         assertNoMoreEvents();
 
-        Assert.assertNotNull(cache.get("/test/foo"));
+        Assert.assertNotNull(cache.getCurrentData("/test/foo"));
         // No byte data querying the tree because we're not caching data.
-        Assert.assertEquals(cache.get("/test/foo").getData().length, 0);
+        Assert.assertNull(cache.getCurrentData("/test/foo").getData());
     }
 
     @Test
@@ -349,7 +346,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test");
         client.create().forPath("/test/foo", "one".getBytes());
 
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.NODE_ADDED, "/test/foo");
@@ -374,7 +371,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         client.create().forPath("/test");
         client.create().forPath("/test/foo", "one".getBytes());
 
-        cache = newCacheWithListeners(client, "/test/foo");
+        cache = newTreeCacheBridgeWithListeners(client, "/test/foo");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test/foo");
         assertEvent(Type.INITIALIZED);
@@ -397,7 +394,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
     {
         client.create().forPath("/test");
 
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.INITIALIZED);
@@ -421,47 +418,103 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
     {
         client.create().forPath("/test");
 
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.INITIALIZED);
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
-        Assert.assertNull(cache.get("/t"));
-        Assert.assertNull(cache.get("/testing"));
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
+        Assert.assertTrue(cache.getCurrentChildren("/t").isEmpty());
+        Assert.assertTrue(cache.getCurrentChildren("/testing").isEmpty());
 
         client.create().forPath("/test/one", "hey there".getBytes());
         assertEvent(Type.NODE_ADDED, "/test/one");
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
-        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
-        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
-        Assert.assertNull(cache.get("/test/o"));
-        Assert.assertNull(cache.get("/test/onely"));
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+        Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertTrue(cache.getCurrentChildren("/test/o").isEmpty());
+        Assert.assertTrue(cache.getCurrentChildren("/test/onely").isEmpty());
 
         client.setData().forPath("/test/one", "sup!".getBytes());
         assertEvent(Type.NODE_UPDATED, "/test/one");
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
-        Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
 
         client.delete().forPath("/test/one");
         assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
-        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
 
         assertNoMoreEvents();
     }
 
     @Test
+    public void testBasicsOnTwoCaches() throws Exception
+    {
+        TreeCacheBridge cache2 = newTreeCacheBridgeWithListeners(client, "/test");
+        cache2.getListenable().removeListener(eventListener);  // Don't listen on the second cache.
+
+        // Just ensures the same event count; enables test flow control on cache2.
+        final Semaphore semaphore = new Semaphore(0);
+        cache2.getListenable().addListener(new TreeCacheListener()
+        {
+            @Override
+            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+            {
+                semaphore.release();
+            }
+        });
+
+        try
+        {
+            client.create().forPath("/test");
+
+            cache = newTreeCacheBridgeWithListeners(client, "/test");
+            cache.start();
+            cache2.start();
+
+            assertEvent(Type.NODE_ADDED, "/test");
+            assertEvent(Type.INITIALIZED);
+            semaphore.acquire(2);
+
+            client.create().forPath("/test/one", "hey there".getBytes());
+            assertEvent(Type.NODE_ADDED, "/test/one");
+            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+            semaphore.acquire();
+            Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "hey there");
+
+            client.setData().forPath("/test/one", "sup!".getBytes());
+            assertEvent(Type.NODE_UPDATED, "/test/one");
+            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+            semaphore.acquire();
+            Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+            client.delete().forPath("/test/one");
+            assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+            Assert.assertNull(cache.getCurrentData("/test/one"));
+            semaphore.acquire();
+            Assert.assertNull(cache2.getCurrentData("/test/one"));
+
+            assertNoMoreEvents();
+            Assert.assertEquals(semaphore.availablePermits(), 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache2);
+        }
+    }
+
+    @Test
     public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
     {
         client.create().forPath("/test");
 
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertEvent(Type.NODE_ADDED, "/test");
         assertEvent(Type.INITIALIZED);
 
         client.create().forPath("/test/one", "hey there".getBytes());
         assertEvent(Type.NODE_ADDED, "/test/one");
-        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+        Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
 
         cache.close();
         assertNoMoreEvents();
@@ -484,7 +537,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
         initCuratorFramework();
 
         // Start the client disconnected.
-        cache = newCacheWithListeners(client, "/test");
+        cache = newTreeCacheBridgeWithListeners(client, "/test");
         cache.start();
         assertNoMoreEvents();
 

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
index f304c24..140ee64 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
@@ -21,8 +21,6 @@ package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.Iterables;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.watch.CachedNode;
-import org.apache.curator.framework.recipes.watch.CuratorCache;
 import org.apache.curator.utils.ZKPaths;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -31,7 +29,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCache>
+@SuppressWarnings("deprecation")
+public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<TreeCacheBridge>
 {
     /**
      * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}
@@ -57,22 +56,39 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
     private final Random random = new Random();
     private boolean withDepth = false;
 
+    @Test
+    public void testGiantRandomDeepTree() throws Exception {
+        doTestGiantRandomDeepTree();
+    }
+
+    @Test
+    public void testGiantRandomDeepTreeWithDepth() throws Exception {
+        withDepth = true;
+        doTestGiantRandomDeepTree();
+    }
+
     /**
      * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use
      * a TreeCache to follow the changes.  At each step, assert that TreeCache matches our
      * source-of-truth test data, and that we see exactly the set of events we expect to see.
      */
-
-    @Test
-    public void testGiantRandomDeepTree() throws Exception {
+    private void doTestGiantRandomDeepTree() throws Exception
+    {
         client.create().forPath("/tree", null);
         CuratorFramework cl = client.usingNamespace("tree");
-        cache = newCacheWithListeners(cl, "/");
+        if ( withDepth )
+        {
+            cache = buildBridgeWithListeners(TreeCache.newBuilder(cl, "/").setMaxDepth(TEST_DEPTH));
+        }
+        else
+        {
+            cache = newTreeCacheBridgeWithListeners(cl, "/");
+        }
         cache.start();
         assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/");
         assertEvent(TreeCacheEvent.Type.INITIALIZED);
 
-        TestNode root = new TestNode("/", new byte[0]);
+        TestNode root = new TestNode("/", null);
         int maxDepth = 0;
         int adds = 0;
         int removals = 0;
@@ -169,7 +185,7 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
             }
 
             // Each iteration, ensure the cached state matches our source-of-truth tree.
-            assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root);
+            assertNodeEquals(cache.getCurrentData("/"), root);
             assertTreeEquals(cache, root, 0);
         }
 
@@ -190,10 +206,10 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
     /**
      * Recursively assert that current children equal expected children.
      */
-    private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth)
+    private void assertTreeEquals(TreeCacheBridge cache, TestNode expectedNode, int depth)
     {
         String path = expectedNode.fullPath;
-        Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path);
+        Map<String, ChildData> cacheChildren = cache.getCurrentChildren(path);
         Assert.assertNotNull(cacheChildren, path);
 
         if (withDepth && depth == TEST_DEPTH) {
@@ -205,9 +221,9 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
         for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() )
         {
             String nodeName = entry.getKey();
-            CachedNode childData = cacheChildren.get(nodeName);
+            ChildData childData = cacheChildren.get(nodeName);
             TestNode expectedChild = entry.getValue();
-            assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild);
+            assertNodeEquals(childData, expectedChild);
             assertTreeEquals(cache, expectedChild, depth + 1);
         }
     }


[3/8] curator git commit: typo - both cache selectors were compressed

Posted by ra...@apache.org.
typo - both cache selectors were compressed


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/570023df
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/570023df
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/570023df

Branch: refs/heads/persistent-watch
Commit: 570023df19106f5ce6895490fe32cbf8be38ac5e
Parents: bf7c5ec
Author: randgalt <ra...@apache.org>
Authored: Thu Aug 24 06:22:07 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Thu Aug 24 06:22:07 2017 +0200

----------------------------------------------------------------------
 .../apache/curator/x/async/modeled/details/ModeledCacheImpl.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/570023df/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index ce73a9b..2684633 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -74,7 +74,7 @@ class ModeledCacheImpl<T> implements CacheListener, ModeledCache<T>
         this.serializer = modelSpec.serializer();
         boolean dataIsCompressed = modelSpec.createOptions().contains(CreateOption.compress);
         cache = CuratorCacheBuilder.builder(client, basePath.fullPath())
-            .withCacheSelector(dataIsCompressed ? CacheSelectors.uncompressedStatAndData() : CacheSelectors.getUncompressedStatOnly())
+            .withCacheSelector(dataIsCompressed ? CacheSelectors.uncompressedStatAndData() : CacheSelectors.statAndData())
             .sendingRefreshEvents(true)
             .build();
     }


[4/8] curator git commit: Abstracted the TreeCache public API and then an alternate implementation that uses the new CuratorCache instead of TreeCache. This should make porting older code much easier

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java
new file mode 100644
index 0000000..cfb897d
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
+import org.apache.curator.test.compatibility.KillSession2;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestTreeCacheBridgeWrapper extends BaseTestTreeCache<CuratorCache>
+{
+    @Test
+    public void testSelector() throws Exception
+    {
+        client.create().forPath("/root");
+        client.create().forPath("/root/n1-a");
+        client.create().forPath("/root/n1-b");
+        client.create().forPath("/root/n1-b/n2-a");
+        client.create().forPath("/root/n1-b/n2-b");
+        client.create().forPath("/root/n1-b/n2-b/n3-a");
+        client.create().forPath("/root/n1-c");
+        client.create().forPath("/root/n1-d");
+
+        TreeCacheSelector selector = new TreeCacheSelector()
+        {
+            @Override
+            public boolean traverseChildren(String fullPath)
+            {
+                return !fullPath.equals("/root/n1-b/n2-b");
+            }
+
+            @Override
+            public boolean acceptChild(String fullPath)
+            {
+                return !fullPath.equals("/root/n1-c");
+            }
+        };
+        cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector)));
+        cache.start();
+
+        assertEvent(Type.NODE_ADDED, "/root");
+        assertEvent(Type.NODE_ADDED, "/root/n1-a");
+        assertEvent(Type.NODE_ADDED, "/root/n1-b");
+        assertEvent(Type.NODE_ADDED, "/root/n1-d");
+        assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-a");
+        assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-b");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testStartup() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/2/sub", "two-sub".getBytes());
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+        Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub"));
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testStartEmpty() throws Exception
+    {
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testStartEmptyDeeper() throws Exception
+    {
+        cache = newCacheWithListeners(client, "/test/foo/bar");
+        cache.start();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().creatingParentsIfNeeded().forPath("/test/foo");
+        assertNoMoreEvents();
+        client.create().forPath("/test/foo/bar");
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDepth0() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/1"));
+        Assert.assertNull(cache.get("/test/1"));
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testDepth1() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+        Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/1/sub"));
+        Assert.assertNull(cache.get("/test/2/sub"));
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testDepth1Deeper() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo");
+        client.create().forPath("/test/foo/bar");
+        client.create().forPath("/test/foo/bar/1", "one".getBytes());
+        client.create().forPath("/test/foo/bar/2", "two".getBytes());
+        client.create().forPath("/test/foo/bar/3", "three".getBytes());
+        client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar/2", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo/bar/3", "three".getBytes());
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testAsyncInitialPopulation() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testFromRoot() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client, "/");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+    }
+
+    @Test
+    public void testFromRootWithDepth() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1));
+        cache = buildCacheWithListeners(builder);
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/one"));
+        Assert.assertNull(cache.get("/test/one"));
+    }
+
+    @Test
+    public void testWithNamespace() throws Exception
+    {
+        client.create().forPath("/outer");
+        client.create().forPath("/outer/foo");
+        client.create().forPath("/outer/test");
+        client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client.usingNamespace("outer"), "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+    }
+
+    @Test
+    public void testWithNamespaceAtRoot() throws Exception
+    {
+        client.create().forPath("/outer");
+        client.create().forPath("/outer/foo");
+        client.create().forPath("/outer/test");
+        client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+        cache = newCacheWithListeners(client.usingNamespace("outer"), "/");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/");
+        assertEvent(Type.NODE_ADDED, "/foo");
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+        Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test"));
+        Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of());
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+    }
+
+    @Test
+    public void testSyncInitialPopulation() throws Exception
+    {
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testChildrenInitialized() throws Exception
+    {
+        client.create().forPath("/test", "".getBytes());
+        client.create().forPath("/test/1", "1".getBytes());
+        client.create().forPath("/test/2", "2".getBytes());
+        client.create().forPath("/test/3", "3".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/1");
+        assertEvent(Type.NODE_ADDED, "/test/2");
+        assertEvent(Type.NODE_ADDED, "/test/3");
+        assertEvent(Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly()));
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test/foo", "first".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        client.setData().forPath("/test/foo", "something new".getBytes());
+        assertEvent(Type.NODE_UPDATED, "/test/foo");
+        assertNoMoreEvents();
+
+        Assert.assertNotNull(cache.get("/test/foo"));
+        // No byte data querying the tree because we're not caching data.
+        Assert.assertEquals(cache.get("/test/foo").getData().length, 0);
+    }
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo", "one".getBytes());
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+        assertEvent(Type.INITIALIZED);
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo", "one".getBytes());
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo", "two".getBytes());
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDeleteThenCreateRoot() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo", "one".getBytes());
+
+        cache = newCacheWithListeners(client, "/test/foo");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+        assertEvent(Type.INITIALIZED);
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo");
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        client.delete().forPath("/test/foo");
+        assertEvent(Type.NODE_REMOVED, "/test/foo");
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test/foo", "foo".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/foo");
+        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/me");
+
+        KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+        assertEvent(Type.CONNECTION_LOST);
+        assertEvent(Type.CONNECTION_RECONNECTED);
+        assertEvent(Type.INITIALIZED);
+        assertEvent(Type.NODE_REMOVED, "/test/me", "data".getBytes());
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/t"));
+        Assert.assertNull(cache.get("/testing"));
+
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+        Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/o"));
+        Assert.assertNull(cache.get("/test/onely"));
+
+        client.setData().forPath("/test/one", "sup!".getBytes());
+        assertEvent(Type.NODE_UPDATED, "/test/one");
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+
+        client.delete().forPath("/test/one");
+        assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+        Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(Type.NODE_ADDED, "/test/one");
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+
+        cache.close();
+        assertNoMoreEvents();
+
+        client.delete().forPath("/test/one");
+        assertNoMoreEvents();
+    }
+
+    /**
+     * Make sure TreeCache gets to a sane state when we can't initially connect to server.
+     */
+    @Test
+    public void testServerNotStartedYet() throws Exception
+    {
+        // Stop the existing server.
+        server.stop();
+
+        // Shutdown the existing client and re-create it started.
+        client.close();
+        initCuratorFramework();
+
+        // Start the client disconnected.
+        cache = newCacheWithListeners(client, "/test");
+        cache.start();
+        assertNoMoreEvents();
+
+        // Now restart the server.
+        server.restart();
+        assertEvent(Type.INITIALIZED);
+
+        client.create().forPath("/test");
+
+        assertEvent(Type.NODE_ADDED, "/test");
+        assertNoMoreEvents();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java
new file mode 100644
index 0000000..03d0999
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.utils.ZKPaths;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class TestTreeCacheBridgeWrapperRandomTree extends BaseTestTreeCache<CuratorCache>
+{
+    /**
+     * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}
+     */
+    private static final class TestNode
+    {
+        String fullPath;
+        byte[] data;
+        Map<String, TestNode> children = new HashMap<String, TestNode>();
+
+        TestNode(String fullPath, byte[] data)
+        {
+            this.fullPath = fullPath;
+            this.data = data;
+        }
+    }
+
+    // These constants will produce a tree about 10 levels deep.
+    private static final int ITERATIONS = 1000;
+    private static final double DIVE_CHANCE = 0.9;
+    private static final int TEST_DEPTH = 5;
+
+    private final Random random = new Random();
+    private boolean withDepth = false;
+
+    /**
+     * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use
+     * a TreeCache to follow the changes.  At each step, assert that TreeCache matches our
+     * source-of-truth test data, and that we see exactly the set of events we expect to see.
+     */
+
+    @Test
+    public void testGiantRandomDeepTree() throws Exception {
+        client.create().forPath("/tree", null);
+        CuratorFramework cl = client.usingNamespace("tree");
+        cache = newCacheWithListeners(cl, "/");
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        TestNode root = new TestNode("/", new byte[0]);
+        int maxDepth = 0;
+        int adds = 0;
+        int removals = 0;
+        int updates = 0;
+
+        for ( int i = 0; i < ITERATIONS; ++i )
+        {
+            // Select a node to update, randomly navigate down through the tree
+            int depth = 0;
+            TestNode last = null;
+            TestNode node = root;
+            while ( !node.children.isEmpty() && random.nextDouble() < DIVE_CHANCE )
+            {
+                // Go down a level in the tree.  Select a random child for the next iteration.
+                last = node;
+                node = Iterables.get(node.children.values(), random.nextInt(node.children.size()));
+                ++depth;
+            }
+            maxDepth = Math.max(depth, maxDepth);
+
+            // Okay we found a node, let's do something interesting with it.
+            switch ( random.nextInt(3) )
+            {
+            case 0:
+                // Try a removal if we have no children and we're not the root node.
+                if ( node != root && node.children.isEmpty() )
+                {
+                    // Delete myself from parent.
+                    TestNode removed = last.children.remove(ZKPaths.getNodeFromPath(node.fullPath));
+                    Assert.assertSame(node, removed);
+
+                    // Delete from ZK
+                    cl.delete().forPath(node.fullPath);
+
+                    // TreeCache should see the delete.
+                    if (shouldSeeEventAt(node.fullPath))
+                    {
+                        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, node.fullPath);
+                    }
+                    ++removals;
+                }
+                break;
+            case 1:
+                // Do an update.
+                byte[] newData = new byte[10];
+                random.nextBytes(newData);
+
+                if ( Arrays.equals(node.data, newData) )
+                {
+                    // Randomly generated the same data! Very small chance, just skip.
+                    continue;
+                }
+
+                // Update source-of-truth.
+                node.data = newData;
+
+                // Update in ZK.
+                cl.setData().forPath(node.fullPath, node.data);
+
+                // TreeCache should see the update.
+                if (shouldSeeEventAt(node.fullPath))
+                {
+                    assertEvent(TreeCacheEvent.Type.NODE_UPDATED, node.fullPath, node.data);
+                }
+
+                ++updates;
+                break;
+            case 2:
+                // Add a new child.
+                String name = Long.toHexString(random.nextLong());
+                if ( node.children.containsKey(name) )
+                {
+                    // Randomly generated the same name! Very small chance, just skip.
+                    continue;
+                }
+
+                // Add a new child to our test tree.
+                byte[] data = new byte[10];
+                random.nextBytes(data);
+                TestNode child = new TestNode(ZKPaths.makePath(node.fullPath, name), data);
+                node.children.put(name, child);
+
+                // Add to ZK.
+                cl.create().forPath(child.fullPath, child.data);
+
+                // TreeCache should see the add.
+                if (shouldSeeEventAt(child.fullPath))
+                {
+                    assertEvent(TreeCacheEvent.Type.NODE_ADDED, child.fullPath, child.data);
+                }
+
+                ++adds;
+                break;
+            }
+
+            // Each iteration, ensure the cached state matches our source-of-truth tree.
+            assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root);
+            assertTreeEquals(cache, root, 0);
+        }
+
+        // Typical stats for this test: maxDepth: 10, adds: 349, removals: 198, updates: 320
+        // We get more adds than removals because removals only happen if we're at a leaf.
+        System.out.println(String.format("maxDepth: %s, adds: %s, removals: %s, updates: %s", maxDepth, adds, removals, updates));
+        assertNoMoreEvents();
+    }
+
+    /**
+     * Returns true we should see an event at this path based on maxDepth, false otherwise.
+     */
+    private boolean shouldSeeEventAt(String fullPath)
+    {
+        return !withDepth || ZKPaths.split(fullPath).size() <= TEST_DEPTH;
+    }
+
+    /**
+     * Recursively assert that current children equal expected children.
+     */
+    private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth)
+    {
+        String path = expectedNode.fullPath;
+        Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path);
+        Assert.assertNotNull(cacheChildren, path);
+
+        if (withDepth && depth == TEST_DEPTH) {
+            return;
+        }
+
+        Assert.assertEquals(cacheChildren.keySet(), expectedNode.children.keySet(), path);
+
+        for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() )
+        {
+            String nodeName = entry.getKey();
+            CachedNode childData = cacheChildren.get(nodeName);
+            TestNode expectedChild = entry.getValue();
+            assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild);
+            assertTreeEquals(cache, expectedChild, depth + 1);
+        }
+    }
+
+    /**
+     * Assert that the given node data matches expected test node data.
+     */
+    private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode)
+    {
+        String path = expectedNode.fullPath;
+        Assert.assertNotNull(actualChild, path);
+        Assert.assertEquals(actualChild.getData(), expectedNode.data, path);
+    }
+}


[6/8] curator git commit: doc updates

Posted by ra...@apache.org.
doc updates


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9c1186be
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9c1186be
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9c1186be

Branch: refs/heads/persistent-watch
Commit: 9c1186bebbbb5f65f18c942824e85051bddf8c19
Parents: 02073a7
Author: randgalt <ra...@apache.org>
Authored: Sun Aug 27 19:21:16 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun Aug 27 19:21:16 2017 +0200

----------------------------------------------------------------------
 .../apache/curator/framework/recipes/cache/TreeCache.java   | 9 ++++++++-
 .../curator/framework/recipes/cache/TreeCacheBridge.java    | 4 ++--
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9c1186be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 4e663cd..b411220 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -73,7 +73,9 @@ import static org.apache.curator.utils.PathUtils.validatePath;
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
  *
- * @deprecated use {@link CuratorCache}
+ * @deprecated use {@link CuratorCache} - to help in upgrading from old code, you can
+ * use {@link org.apache.curator.framework.recipes.cache.TreeCache.Builder#buildBridge()} to build an instance
+ * that roughly matches the TreeCache API but is backed by a CuratorCache
  */
 @Deprecated
 public class TreeCache implements TreeCacheBridge
@@ -112,6 +114,11 @@ public class TreeCache implements TreeCacheBridge
             return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, selector);
         }
 
+        /**
+         * Builds a TreeCacheBridge instance backed by {@link org.apache.curator.framework.recipes.watch.CuratorCache}
+         *
+         * @return instance
+         */
         public TreeCacheBridge buildBridge()
         {
             CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, path);

http://git-wip-us.apache.org/repos/asf/curator/blob/9c1186be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
index 9c383af..8b6f37a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -30,10 +30,10 @@ public interface TreeCacheBridge extends Closeable
     /**
      * Return the current set of children at the given path, mapped by child name. There are no
      * guarantees of accuracy; this is merely the most recent view of the data.  If there is no
-     * node at this path, {@code null} is returned.
+     * node at this path, an empty list or {@code null} is returned (depending on implementation).
      *
      * @param fullPath full path to the node to check
-     * @return a possibly-empty list of children if the node is alive, or null
+     * @return a possibly-empty list of children if the node is alive, or null (depending on implementation)
      */
     Map<String, ChildData> getCurrentChildren(String fullPath);