You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/12/29 04:10:23 UTC

curator git commit: Finished addPersistentWatcher DSL, re-wrote new version of cache code to handle all cases and deprecated other versions

Repository: curator
Updated Branches:
  refs/heads/persistent-watch 32a2fb759 -> 94a0205d4


Finished addPersistentWatcher DSL, re-wrote new version of cache code to handle all cases and deprecated other versions


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/94a0205d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/94a0205d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/94a0205d

Branch: refs/heads/persistent-watch
Commit: 94a0205d4c3d34b1e1384ab5af1b997f74d2a912
Parents: 32a2fb7
Author: randgalt <ra...@apache.org>
Authored: Wed Dec 28 23:10:15 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Dec 28 23:10:15 2016 -0500

----------------------------------------------------------------------
 .../curator/framework/api/CuratorEventType.java |   7 +-
 .../imps/AddPersistentWatchBuilderImpl.java     | 166 ++++++++++
 .../framework/imps/CuratorFrameworkImpl.java    |   2 +-
 .../framework/recipes/cache/NodeCache.java      |   4 +
 .../recipes/cache/PathChildrenCache.java        |   4 +
 .../recipes/cache/PersistentWatcherCache.java   | 317 -------------------
 .../cache/PersistentWatcherCacheFilter.java     |  14 -
 .../cache/PersistentWatcherCacheListener.java   |  12 -
 .../framework/recipes/cache/TreeCache.java      |   4 +
 .../recipes/nodes/PersistentWatcher.java        | 104 ------
 .../framework/recipes/watch/CacheAction.java    |  27 ++
 .../framework/recipes/watch/CacheEventType.java |  27 ++
 .../framework/recipes/watch/CacheFilter.java    |  24 ++
 .../framework/recipes/watch/CacheListener.java  |  24 ++
 .../framework/recipes/watch/CachedNode.java     |  93 ++++++
 .../framework/recipes/watch/CuratorCache.java   | 123 +++++++
 .../recipes/watch/CuratorCacheBase.java         | 105 ++++++
 .../recipes/watch/CuratorCacheBuilder.java      |  98 ++++++
 .../recipes/watch/InternalCuratorCache.java     | 232 ++++++++++++++
 .../recipes/watch/InternalNodeCache.java        | 302 ++++++++++++++++++
 .../recipes/watch/PersistentWatcher.java        | 140 ++++++++
 .../recipes/watch/SingleLevelCacheFilter.java   |  51 +++
 .../recipes/watch/StatsOnlyCacheFilter.java     |  28 ++
 23 files changed, 1459 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..4766ca5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
     /**
      * Event sent when client is being closed
      */
-    CLOSING
+    CLOSING,
+
+    /**
+     * Corresponds to {@link CuratorFramework#addPersistentWatch()}
+     */
+    ADD_PERSISTENT_WATCH
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..bf4dfb6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching = null;
+    private Backgrounding backgrounding = new Backgrounding();
+
+    AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background");
+            client.getZooKeeper().addPersistentWatch
+                (
+                    fixedPath,
+                    watching.getWatcher(path),
+                    new AsyncCallback.VoidCallback()
+                    {
+                        @Override
+                        public void processResult(int rc, String path, Object ctx)
+                        {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        }
+                    },
+                    backgrounding.getContext()
+                );
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(),
+            new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path));
+                    return null;
+                }
+            }
+        );
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 04113fd..dd995e5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -563,7 +563,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     @Override
     public AddPersistentWatchBuilder addPersistentWatch()
     {
-        return null;    // TODO
+        return new AddPersistentWatchBuilderImpl(this);
     }
 
     ACLProvider getAclProvider()

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9a6eaa7..b288f2a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -53,7 +54,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated use {@link CuratorCache}
  */
+@Deprecated
 public class NodeCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 91a3a98..c5b9eba 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
@@ -61,7 +62,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated use {@link CuratorCache}
  */
+@Deprecated
 @SuppressWarnings("NullableProblems")
 public class PathChildrenCache implements Closeable
 {

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java
deleted file mode 100644
index 0e7a448..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package org.apache.curator.framework.recipes.cache;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.nodes.PersistentWatcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class PersistentWatcherCache implements Closeable
-{
-    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
-    private final PersistentWatcher persistentWatcher;
-    private final ListenerContainer<PersistentWatcherCacheListener> listeners = new ListenerContainer<>();
-    private final ConcurrentMap<String, ChildData> cache = new ConcurrentHashMap<>();
-    private final AtomicReference<PersistentWatcherCacheFilter> cacheFilter = new AtomicReference<>(defaultCacheFilter);
-
-    private static final PersistentWatcherCacheFilter defaultCacheFilter = new PersistentWatcherCacheFilter()
-    {
-        @Override
-        public Action actionForPath(String path)
-        {
-            return Action.IGNORE;
-        }
-    };
-    private final CuratorFramework client;
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    private final Watcher watcher = new Watcher()
-    {
-        @Override
-        public void process(final WatchedEvent event)
-        {
-            processEvent(event);
-        }
-    };
-
-    public PersistentWatcherCache(CuratorFramework client, String basePath)
-    {
-        this.client = Objects.requireNonNull(client, "client cannot be null");
-        persistentWatcher = new PersistentWatcher(client, basePath)
-        {
-            @Override
-            protected void reset()
-            {
-                super.reset();
-                refreshData();
-            }
-        };
-    }
-
-    public void start()
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
-        persistentWatcher.getListenable().addListener(watcher);
-        persistentWatcher.start();
-    }
-
-    @Override
-    public void close()
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            persistentWatcher.close();
-        }
-    }
-
-    public Listenable<PersistentWatcherCacheListener> getListenable()
-    {
-        return listeners;
-    }
-
-    public void setCacheFilter(PersistentWatcherCacheFilter cacheFilter)
-    {
-        this.cacheFilter.set(Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null"));
-    }
-
-    /**
-     * Return the current data. There are no guarantees of accuracy. This is
-     * merely the most recent view of the data.
-     *
-     * @return list of children and data
-     */
-    public Map<String, ChildData> getCurrentData()
-    {
-        return ImmutableMap.copyOf(cache);
-    }
-
-    /**
-     * Return the current data for the given path. There are no guarantees of accuracy. This is
-     * merely the most recent view of the data. If there is no child with that path, <code>null</code>
-     * is returned.
-     *
-     * @param fullPath full path to the node to check
-     * @return data or null
-     */
-    public ChildData getCurrentData(String fullPath)
-    {
-        return cache.get(fullPath);
-    }
-
-    /**
-     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
-     *
-     * @param fullPath the path of the node to clear
-     */
-    public void clearDataBytes(String fullPath)
-    {
-        clearDataBytes(fullPath, -1);
-    }
-
-    /**
-     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
-     * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
-     *
-     * @param fullPath  the path of the node to clear
-     * @param ifVersion if non-negative, only clear the data if the data's version matches this version
-     * @return true if the data was cleared
-     */
-    public boolean clearDataBytes(String fullPath, int ifVersion)
-    {
-        ChildData data = cache.get(fullPath);
-        if ( data != null )
-        {
-            if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) )
-            {
-                if ( data.getData() != null )
-                {
-                    cache.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));
-                }
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Clears the current data without beginning a new query and without generating any events
-     * for listeners.
-     */
-    public void clear()
-    {
-        cache.clear();
-    }
-
-    public void refreshData()
-    {
-        for ( String path : cache.keySet() )
-        {
-            PersistentWatcherCacheFilter.Action action = cacheFilter.get().actionForPath(path);
-            if ( (action == PersistentWatcherCacheFilter.Action.GET_DATA_THEN_SAVE) || (action == PersistentWatcherCacheFilter.Action.SAVE_THEN_GET_DATA) )
-            {
-                getData(path);
-            }
-        }
-    }
-
-    private void processEvent(final WatchedEvent event)
-    {
-        switch ( event.getType() )
-        {
-            default:
-            {
-                // ignore
-                break;
-            }
-
-            case NodeCreated:
-            case NodeDataChanged:
-            {
-                updateNode(event.getPath());
-                break;
-            }
-
-            case NodeDeleted:
-            {
-                deleteNode(event.getPath());
-                break;
-            }
-        }
-    }
-
-    private void deleteNode(final String path)
-    {
-        if ( cache.remove(path) != null )
-        {
-            Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>()
-            {
-                @Override
-                public Void apply(PersistentWatcherCacheListener listener)
-                {
-                    listener.nodeDeleted(path);
-                    return null;
-                }
-            };
-            listeners.forEach(proc);
-        }
-    }
-
-    private void updateNode(final String path)
-    {
-        boolean putAndCallListeners;
-        boolean doGetData;
-        switch ( cacheFilter.get().actionForPath(path) )
-        {
-            default:
-            case IGNORE:
-            {
-                putAndCallListeners = doGetData = false;
-                break;
-            }
-
-            case SAVE_ONLY:
-            {
-                putAndCallListeners = true;
-                doGetData = false;
-                break;
-            }
-
-            case SAVE_THEN_GET_DATA:
-            {
-                putAndCallListeners = doGetData = true;
-                break;
-            }
-
-            case GET_DATA_THEN_SAVE:
-            {
-                doGetData = true;
-                putAndCallListeners = false;
-                break;
-            }
-        }
-
-        if ( putAndCallListeners )
-        {
-            final ChildData oldData = cache.put(path, new ChildData(path, null, null));
-            Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>()
-            {
-                @Override
-                public Void apply(PersistentWatcherCacheListener listener)
-                {
-                    if ( oldData == null )
-                    {
-                        listener.nodeCreated(path);
-                    }
-                    else
-                    {
-                        listener.nodeDataChanged(path);
-                    }
-                    return null;
-                }
-            };
-            listeners.forEach(proc);
-        }
-
-        if ( doGetData )
-        {
-            getData(path);
-        }
-    }
-
-    private void getData(String path)
-    {
-        try
-        {
-            BackgroundCallback callback = new BackgroundCallback()
-            {
-                @Override
-                public void processResult(CuratorFramework client, final CuratorEvent event) throws Exception
-                {
-                    if ( event.getType() == CuratorEventType.GET_DATA )
-                    {
-                        ChildData newData = new ChildData(event.getPath(), event.getStat(), event.getData());
-                        ChildData oldData = cache.put(event.getPath(), newData);
-                        if ( !newData.equals(oldData) )
-                        {
-                            Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>()
-                            {
-                                @Override
-                                public Void apply(PersistentWatcherCacheListener listener)
-                                {
-                                    listener.nodeDataAvailable(event.getPath());
-                                    return null;
-                                }
-                            };
-                            listeners.forEach(proc);
-                        }
-                    }
-                }
-            };
-            client.getData().inBackground().forPath(path);
-        }
-        catch ( Exception e )
-        {
-            // TODO
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java
deleted file mode 100644
index b6dc694..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.curator.framework.recipes.cache;
-
-public interface PersistentWatcherCacheFilter
-{
-    enum Action
-    {
-        IGNORE,
-        SAVE_ONLY,
-        SAVE_THEN_GET_DATA,
-        GET_DATA_THEN_SAVE
-    }
-
-    Action actionForPath(String path);
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java
deleted file mode 100644
index 7a01fd4..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.curator.framework.recipes.cache;
-
-public interface PersistentWatcherCacheListener
-{
-    void nodeCreated(String path);
-
-    void nodeDeleted(String path);
-
-    void nodeDataChanged(String path);
-
-    void nodeDataAvailable(String path);
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index ed32223..50e3fd6 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.PathUtils;
@@ -67,7 +68,10 @@ import static org.apache.curator.utils.PathUtils.validatePath;
  * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
  * be prepared for false-positives and false-negatives. Additionally, always use the version number
  * when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated use {@link CuratorCache}
  */
+@Deprecated
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java
deleted file mode 100644
index 9324d55..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.curator.framework.recipes.nodes;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import java.io.Closeable;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class PersistentWatcher implements Closeable
-{
-    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
-    private final ListenerContainer<Watcher> listeners = new ListenerContainer<>();
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            if ( newState.isConnected() )
-            {
-                reset();
-            }
-        }
-    };
-    private final Watcher watcher = new Watcher()
-    {
-        @Override
-        public void process(final WatchedEvent event)
-        {
-            Function<Watcher, Void> function = new Function<Watcher, Void>()
-            {
-                @Override
-                public Void apply(Watcher watcher)
-                {
-                    watcher.process(event);
-                    return null;
-                }
-            };
-            listeners.forEach(function);
-        }
-    };
-    private final CuratorFramework client;
-    private final String basePath;
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    public PersistentWatcher(CuratorFramework client, String basePath)
-    {
-        this.client = Objects.requireNonNull(client, "client cannot be null");
-        this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
-    }
-
-    public void start()
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
-        client.getConnectionStateListenable().addListener(connectionStateListener);
-        reset();
-    }
-
-    @Override
-    public void close()
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            client.getConnectionStateListenable().removeListener(connectionStateListener);
-            try
-            {
-                client.watches().remove(watcher).inBackground().forPath(basePath);
-            }
-            catch ( Exception e )
-            {
-                // TODO
-            }
-        }
-    }
-
-    public Listenable<Watcher> getListenable()
-    {
-        return listeners;
-    }
-
-    protected void reset()
-    {
-        try
-        {
-            client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath);
-        }
-        catch ( Exception e )
-        {
-            // TODO
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
new file mode 100644
index 0000000..77508c1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public enum CacheAction
+{
+    IGNORE,
+    DO_NOT_GET_DATA,
+    GET_DATA,
+    GET_COMPRESSED
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java
new file mode 100644
index 0000000..8094df2
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public enum CacheEventType
+{
+    NODE_CREATED,
+    NODE_DELETED,
+    NODE_CHANGED,
+    REFRESHED
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
new file mode 100644
index 0000000..9923174
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public interface CacheFilter
+{
+    CacheAction actionForPath(String path);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java
new file mode 100644
index 0000000..9f36042
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public interface CacheListener
+{
+    void process(CacheEventType eventType, String path);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
new file mode 100644
index 0000000..0f438a2
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.data.Stat;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class CachedNode
+{
+    private final Stat stat;
+    private final byte[] data;
+
+    private static final byte[] defaultData = new byte[0];
+
+    public CachedNode()
+    {
+        this(new Stat(), defaultData);
+    }
+
+    public CachedNode(Stat stat)
+    {
+        this(stat, defaultData);
+    }
+
+    public CachedNode(Stat stat, byte[] data)
+    {
+        this.stat = Objects.requireNonNull(stat, "stat cannot be null");
+        this.data = Objects.requireNonNull(data, "data cannot be null");
+    }
+
+    public Stat getStat()
+    {
+        return stat;
+    }
+
+    public byte[] getData()
+    {
+        return data;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( this == o )
+        {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() )
+        {
+            return false;
+        }
+
+        CachedNode that = (CachedNode)o;
+
+        //noinspection SimplifiableIfStatement
+        if ( !stat.equals(that.stat) )
+        {
+            return false;
+        }
+        return Arrays.equals(data, that.data);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = stat.hashCode();
+        result = 31 * result + Arrays.hashCode(data);
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CachedNode{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
new file mode 100644
index 0000000..131cc2e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * General interface for client-cached nodes. Create instances
+ * using {@link CuratorCacheBuilder}
+ */
+public interface CuratorCache extends Closeable
+{
+    /**
+     * Start the cache
+     */
+    void start();
+
+    @Override
+    void close();
+
+    /**
+     * Get listenable container used to add/remove listeners
+     *
+     * @return listener container
+     */
+    Listenable<CacheListener> getListenable();
+
+    /**
+     * Refresh all cached nodes and send {@link CacheEventType#REFRESHED} when completed
+     */
+    void refreshAll();
+
+    /**
+     * Refresh the given cached node
+     *
+     * @param path node full path
+     */
+    void refresh(String path);
+
+    /**
+     * Remove the given path from the cache.
+     *
+     * @param path node full path
+     * @return true if the node was in the cache
+     */
+    boolean clear(String path);
+
+    /**
+     * Remove all nodes from the cache
+     */
+    void clearAll();
+
+    /**
+     * Return true if there is a cached node at the given path
+     *
+     * @param path node full path
+     * @return true/false
+     */
+    boolean exists(String path);
+
+    /**
+     * Returns the set of paths in the cache. The returned set behaves in the same manner
+     * as {@link ConcurrentHashMap#keySet()}
+     *
+     * @return set of paths
+     */
+    Set<String> paths();
+
+    /**
+     * Returns the collection of node values in the cache. The returned set behaves in the same manner
+     * as {@link ConcurrentHashMap#values()}
+     *
+     * @return node values
+     */
+    Collection<CachedNode> nodes();
+
+    /**
+     * Returns the collection of node entries in the cache. The returned set behaves in the same manner
+     * as {@link ConcurrentHashMap#entrySet()}
+     *
+     * @return node entries
+     */
+    Set<Map.Entry<String, CachedNode>> entries();
+
+    /**
+     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     *
+     * @param path the path of the node to clear
+     */
+    void clearDataBytes(String path);
+
+    /**
+     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     *
+     * @param path  the path of the node to clear
+     * @param ifVersion if non-negative, only clear the data if the data's version matches this version
+     * @return true if the data was cleared
+     */
+    boolean clearDataBytes(String path, int ifVersion);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
new file mode 100644
index 0000000..0affa18
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.cache.Cache;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+abstract class CuratorCacheBase implements CuratorCache
+{
+    protected final Cache<String, CachedNode> cache;
+
+    protected CuratorCacheBase(Cache<String, CachedNode> cache)
+    {
+        this.cache = Objects.requireNonNull(cache, "cache cannot be null");
+    }
+
+    @Override
+    public final boolean clear(String path)
+    {
+        return cache.asMap().remove(path) != null;
+    }
+
+    @Override
+    public final void clearAll()
+    {
+        cache.invalidateAll();
+    }
+
+    @Override
+    public final boolean exists(String path)
+    {
+        return cache.asMap().containsKey(path);
+    }
+
+    @Override
+    public final Set<String> paths()
+    {
+        return cache.asMap().keySet();
+    }
+
+    @Override
+    public final Collection<CachedNode> nodes()
+    {
+        return cache.asMap().values();
+    }
+
+    @Override
+    public final Set<Map.Entry<String, CachedNode>> entries()
+    {
+        return cache.asMap().entrySet();
+    }
+
+    /**
+     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     *
+     * @param path the path of the node to clear
+     */
+    @Override
+    public final void clearDataBytes(String path)
+    {
+        clearDataBytes(path, -1);
+    }
+
+    /**
+     * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+     * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+     *
+     * @param path  the path of the node to clear
+     * @param ifVersion if non-negative, only clear the data if the data's version matches this version
+     * @return true if the data was cleared
+     */
+    @Override
+    public final boolean clearDataBytes(String path, int ifVersion)
+    {
+        CachedNode data = cache.asMap().get(path);
+        if ( data != null )
+        {
+            if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) )
+            {
+                return cache.asMap().replace(path, data, new CachedNode(data.getStat()));
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
new file mode 100644
index 0000000..8ee707b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.cache.CacheBuilder;
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class CuratorCacheBuilder
+{
+    private CacheFilter cacheFilter = new CacheFilter()
+    {
+        @Override
+        public CacheAction actionForPath(String path)
+        {
+            return CacheAction.DO_NOT_GET_DATA;
+        }
+    };
+    private CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
+    private String path;
+    private boolean singleNode;
+
+    public static CuratorCacheBuilder forPath(String path)
+    {
+        CuratorCacheBuilder builder = new CuratorCacheBuilder();
+        builder.path = Objects.requireNonNull(path, "path cannot be null");
+        builder.singleNode = false;
+        return builder;
+    }
+
+    public static CuratorCacheBuilder forNode(String path)
+    {
+        CuratorCacheBuilder builder = new CuratorCacheBuilder();
+        builder.path = Objects.requireNonNull(path, "path cannot be null");
+        builder.singleNode = true;
+        return builder;
+    }
+
+    public CuratorCache build(CuratorFramework client)
+    {
+        if ( singleNode )
+        {
+            return new InternalNodeCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build());
+        }
+        return new InternalCuratorCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build());
+    }
+
+    public CuratorCacheBuilder usingWeakValues()
+    {
+        cacheBuilder = cacheBuilder.weakValues();
+        return this;
+    }
+
+    public CuratorCacheBuilder usingSoftValues()
+    {
+        cacheBuilder = cacheBuilder.softValues();
+        return this;
+    }
+
+    public CuratorCacheBuilder thatExpiresAfterWrite(long duration, TimeUnit unit)
+    {
+        cacheBuilder = cacheBuilder.expireAfterWrite(duration, unit);
+        return this;
+    }
+
+    public CuratorCacheBuilder thatExpiresAfterAccess(long duration, TimeUnit unit)
+    {
+        cacheBuilder = cacheBuilder.expireAfterAccess(duration, unit);
+        return this;
+    }
+
+    public CuratorCacheBuilder withCacheFilter(CacheFilter cacheFilter)
+    {
+        this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
+        return this;
+    }
+
+    private CuratorCacheBuilder()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
new file mode 100644
index 0000000..08006a1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class InternalCuratorCache extends CuratorCacheBase implements Watcher
+{
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final PersistentWatcher watcher;
+    private final CuratorFramework client;
+    private final String basePath;
+    private final CacheFilter cacheFilter;
+    private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+    private static final CachedNode nullNode = new CachedNode();
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, Cache<String, CachedNode> cache)
+    {
+        super(cache);
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        basePath = path;
+        this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
+        watcher = new PersistentWatcher(client, path)
+        {
+            @Override
+            protected void watcherSet()
+            {
+                refreshAll();
+            }
+        };
+        watcher.getListenable().addListener(this);
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
+        watcher.start();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            watcher.getListenable().removeListener(this);
+            listeners.clear();
+            watcher.close();
+        }
+    }
+
+    @Override
+    public Listenable<CacheListener> getListenable()
+    {
+        return listeners;
+    }
+
+    @Override
+    public void process(WatchedEvent event)
+    {
+        switch ( event.getType() )
+        {
+            default:
+            {
+                // NOP
+                break;
+            }
+
+            case NodeDeleted:
+            {
+                if ( cache.asMap().remove(event.getPath()) != null )
+                {
+                    notifyListeners(CacheEventType.NODE_DELETED, event.getPath());
+                }
+                break;
+            }
+
+            case NodeCreated:
+            case NodeDataChanged:
+            {
+                refresh(event.getPath());
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void refreshAll()
+    {
+        Set<String> keySet = new HashSet<>(cache.asMap().keySet());
+        AtomicInteger counter = new AtomicInteger(keySet.size());
+        for ( String path : keySet )
+        {
+            internalRefresh(path, counter);
+        }
+    }
+
+    @Override
+    public void refresh(String path)
+    {
+        internalRefresh(path, null);
+    }
+
+    private void internalRefresh(final String path, final AtomicInteger counter)
+    {
+        BackgroundCallback callback = new BackgroundCallback()
+        {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+            {
+                if ( event.getType() == CuratorEventType.GET_DATA )
+                {
+                    CachedNode newNode = new CachedNode(event.getStat(), event.getData());
+                    CachedNode oldNode = cache.asMap().put(path, newNode);
+                    if ( oldNode == null )
+                    {
+                        notifyListeners(CacheEventType.NODE_CREATED, path);
+                    }
+                    else if ( !newNode.equals(oldNode) )
+                    {
+                        notifyListeners(CacheEventType.NODE_CHANGED, path);
+                    }
+                }
+
+                if ( counter.decrementAndGet() <= 0 )
+                {
+                    notifyListeners(CacheEventType.REFRESHED, basePath);
+                }
+            }
+        };
+
+        switch ( cacheFilter.actionForPath(path) )
+        {
+            case IGNORE:
+            {
+                // NOP
+                break;
+            }
+
+            case DO_NOT_GET_DATA:
+            {
+                if ( cache.asMap().put(path, nullNode) == null )
+                {
+                    notifyListeners(CacheEventType.NODE_CREATED, path);
+                }
+                break;
+            }
+
+            case GET_DATA:
+            {
+                try
+                {
+                    client.getData().inBackground().forPath(path);
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    // TODO
+                }
+                break;
+            }
+
+            case GET_COMPRESSED:
+            {
+                try
+                {
+                    client.getData().decompressed().inBackground().forPath(path);
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    // TODO
+                }
+                break;
+            }
+        }
+    }
+
+    private void notifyListeners(final CacheEventType eventType, final String path)
+    {
+        Function<CacheListener, Void> proc = new Function<CacheListener, Void>()
+        {
+            @Override
+            public Void apply(CacheListener listener)
+            {
+                listener.process(eventType, path);
+                return null;
+            }
+        };
+        listeners.forEach(proc);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
new file mode 100644
index 0000000..12571de
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+class InternalNodeCache extends CuratorCacheBase
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final WatcherRemoveCuratorFramework client;
+    private final String path;
+    private final CacheFilter cacheFilter;
+    private final AtomicReference<CachedNode> data = new AtomicReference<>(null);
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+    private final AtomicBoolean isConnected = new AtomicBoolean(true);
+    private final AtomicBoolean resetEventNeeded = new AtomicBoolean(true);
+    private static final CachedNode nullNode = new CachedNode();
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+            {
+                if ( isConnected.compareAndSet(false, true) )
+                {
+                    try
+                    {
+                        reset();
+                    }
+                    catch ( Exception e )
+                    {
+                        ThreadUtils.checkInterrupted(e);
+                        log.error("Trying to reset after reconnection", e);
+                    }
+                }
+            }
+            else
+            {
+                isConnected.set(false);
+            }
+        }
+    };
+
+    private Watcher watcher = new Watcher()
+    {
+        @Override
+        public void process(WatchedEvent event)
+        {
+            try
+            {
+                reset();
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                // TODO
+            }
+        }
+    };
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    private final BackgroundCallback backgroundCallback = new BackgroundCallback()
+    {
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            try
+            {
+                processBackgroundResult(event);
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                // TODO
+            }
+
+            if ( resetEventNeeded.compareAndSet(true, false) )
+            {
+                notifyListeners(CacheEventType.REFRESHED);
+            }
+        }
+    };
+
+    InternalNodeCache(CuratorFramework client, String path, CacheFilter cacheFilter, Cache<String, CachedNode> cache)
+    {
+        super(cache);
+        this.client = client.newWatcherRemoveCuratorFramework();
+        this.path = PathUtils.validatePath(path);
+        this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
+    }
+
+    @Override
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
+
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        refreshAll();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            client.removeWatchers();
+            listeners.clear();
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+        }
+    }
+
+    @Override
+    public Listenable<CacheListener> getListenable()
+    {
+        return listeners;
+    }
+
+    @Override
+    public void refreshAll()
+    {
+        try
+        {
+            resetEventNeeded.set(true);
+            reset();
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+            // TODO
+        }
+    }
+
+    @Override
+    public void refresh(String path)
+    {
+        Preconditions.checkArgument(this.path.equals(path), "Bad path: " + path);
+        refreshAll();
+    }
+
+    private void reset() throws Exception
+    {
+        if ( (state.get() == State.STARTED) && isConnected.get() )
+        {
+            client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
+        }
+    }
+
+    private void processBackgroundResult(CuratorEvent event) throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case GET_DATA:
+            {
+                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
+                    CachedNode cachedNode = new CachedNode(event.getStat(), event.getData());
+                    setNewData(cachedNode);
+                }
+                break;
+            }
+
+            case EXISTS:
+            {
+                if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+                {
+                    setNewData(null);
+                }
+                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
+                    switch ( cacheFilter.actionForPath(path) )
+                    {
+                        default:
+                        case IGNORE:
+                        {
+                            throw new UnsupportedOperationException("Single node cache does not support action: IGNORE");
+                        }
+
+                        case DO_NOT_GET_DATA:
+                        {
+                            setNewData(nullNode);
+                            break;
+                        }
+
+                        case GET_DATA:
+                        {
+                            client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
+                            break;
+                        }
+
+                        case GET_COMPRESSED:
+                        {
+                            client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
+                            break;
+                        }
+                    }
+                }
+                break;
+            }
+        }
+    }
+
+    @VisibleForTesting
+    volatile Exchanger<Object> rebuildTestExchanger;
+    private void setNewData(CachedNode newData) throws InterruptedException
+    {
+        CachedNode previousData = data.getAndSet(newData);
+        if ( newData == null )
+        {
+            notifyListeners(CacheEventType.NODE_DELETED);
+        }
+        else if ( previousData == null )
+        {
+            notifyListeners(CacheEventType.NODE_CREATED);
+        }
+        else if ( !previousData.equals(newData) )
+        {
+            notifyListeners(CacheEventType.NODE_CHANGED);
+        }
+
+        if ( rebuildTestExchanger != null )
+        {
+            try
+            {
+                rebuildTestExchanger.exchange(new Object());
+            }
+            catch ( InterruptedException e )
+            {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private void notifyListeners(final CacheEventType event)
+    {
+        listeners.forEach
+        (
+            new Function<CacheListener, Void>()
+            {
+                @Override
+                public Void apply(CacheListener listener)
+                {
+                    try
+                    {
+                        listener.process(event, path);
+                    }
+                    catch ( Exception e )
+                    {
+                        ThreadUtils.checkInterrupted(e);
+                        log.error("Calling listener", e);
+                    }
+                    return null;
+                }
+            }
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..2f136a6
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PersistentWatcher implements Closeable
+{
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final ListenerContainer<Watcher> listeners = new ListenerContainer<>();
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( newState.isConnected() )
+            {
+                reset();
+            }
+        }
+    };
+    private final Watcher watcher = new Watcher()
+    {
+        @Override
+        public void process(final WatchedEvent event)
+        {
+            Function<Watcher, Void> function = new Function<Watcher, Void>()
+            {
+                @Override
+                public Void apply(Watcher watcher)
+                {
+                    watcher.process(event);
+                    return null;
+                }
+            };
+            listeners.forEach(function);
+        }
+    };
+    private final CuratorFramework client;
+    private final String basePath;
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    public PersistentWatcher(CuratorFramework client, String basePath)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+    }
+
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+            try
+            {
+                client.watches().remove(watcher).inBackground().forPath(basePath);
+            }
+            catch ( Exception e )
+            {
+                // TODO
+            }
+        }
+    }
+
+    public Listenable<Watcher> getListenable()
+    {
+        return listeners;
+    }
+
+    private void reset()
+    {
+        try
+        {
+            BackgroundCallback callback = new BackgroundCallback()
+            {
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+                {
+                    if ( event.getResultCode() == 0 )
+                    {
+                        watcherSet();
+                    }
+                }
+            };
+            client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath);
+        }
+        catch ( Exception e )
+        {
+            // TODO
+        }
+    }
+
+    protected void watcherSet()
+    {
+        // default is NOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
new file mode 100644
index 0000000..1fbe255
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public class SingleLevelCacheFilter implements CacheFilter
+{
+    private final String levelPath;
+    private final CacheAction defaultAction;
+
+    public SingleLevelCacheFilter(String levelPath)
+    {
+        this(levelPath, CacheAction.GET_DATA);
+    }
+
+    public SingleLevelCacheFilter(String levelPath, CacheAction defaultAction)
+    {
+        this.levelPath = levelPath;
+        this.defaultAction = defaultAction;
+    }
+
+    @Override
+    public CacheAction actionForPath(String path)
+    {
+        if ( levelPath.equals(path) )
+        {
+            return actionForMatchedPath();
+        }
+        return CacheAction.IGNORE;
+    }
+
+    protected CacheAction actionForMatchedPath()
+    {
+        return defaultAction;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
new file mode 100644
index 0000000..d3c7fac
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public class StatsOnlyCacheFilter implements CacheFilter
+{
+    @Override
+    public CacheAction actionForPath(String path)
+    {
+        return CacheAction.DO_NOT_GET_DATA;
+    }
+}