You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/12/29 04:10:23 UTC
curator git commit: Finished addPersistentWatcher DSL,
re-wrote new version of cache code to handle all cases and deprecated
other versions
Repository: curator
Updated Branches:
refs/heads/persistent-watch 32a2fb759 -> 94a0205d4
Finished addPersistentWatcher DSL, re-wrote new version of cache code to handle all cases and deprecated other versions
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/94a0205d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/94a0205d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/94a0205d
Branch: refs/heads/persistent-watch
Commit: 94a0205d4c3d34b1e1384ab5af1b997f74d2a912
Parents: 32a2fb7
Author: randgalt <ra...@apache.org>
Authored: Wed Dec 28 23:10:15 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Dec 28 23:10:15 2016 -0500
----------------------------------------------------------------------
.../curator/framework/api/CuratorEventType.java | 7 +-
.../imps/AddPersistentWatchBuilderImpl.java | 166 ++++++++++
.../framework/imps/CuratorFrameworkImpl.java | 2 +-
.../framework/recipes/cache/NodeCache.java | 4 +
.../recipes/cache/PathChildrenCache.java | 4 +
.../recipes/cache/PersistentWatcherCache.java | 317 -------------------
.../cache/PersistentWatcherCacheFilter.java | 14 -
.../cache/PersistentWatcherCacheListener.java | 12 -
.../framework/recipes/cache/TreeCache.java | 4 +
.../recipes/nodes/PersistentWatcher.java | 104 ------
.../framework/recipes/watch/CacheAction.java | 27 ++
.../framework/recipes/watch/CacheEventType.java | 27 ++
.../framework/recipes/watch/CacheFilter.java | 24 ++
.../framework/recipes/watch/CacheListener.java | 24 ++
.../framework/recipes/watch/CachedNode.java | 93 ++++++
.../framework/recipes/watch/CuratorCache.java | 123 +++++++
.../recipes/watch/CuratorCacheBase.java | 105 ++++++
.../recipes/watch/CuratorCacheBuilder.java | 98 ++++++
.../recipes/watch/InternalCuratorCache.java | 232 ++++++++++++++
.../recipes/watch/InternalNodeCache.java | 302 ++++++++++++++++++
.../recipes/watch/PersistentWatcher.java | 140 ++++++++
.../recipes/watch/SingleLevelCacheFilter.java | 51 +++
.../recipes/watch/StatsOnlyCacheFilter.java | 28 ++
23 files changed, 1459 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..4766ca5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
/**
* Event sent when client is being closed
*/
- CLOSING
+ CLOSING,
+
+ /**
+ * Corresponds to {@link CuratorFramework#addPersistentWatch()}
+ */
+ ADD_PERSISTENT_WATCH
}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..bf4dfb6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+ private final CuratorFrameworkImpl client;
+ private Watching watching = null;
+ private Backgrounding backgrounding = new Backgrounding();
+
+ AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground()
+ {
+ backgrounding = new Backgrounding();
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> usingWatcher(Watcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(Object context)
+ {
+ backgrounding = new Backgrounding(context);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+ {
+ backgrounding = new Backgrounding(callback);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+ {
+ backgrounding = new Backgrounding(callback, context);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+ {
+ backgrounding = new Backgrounding(callback, executor);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, context, executor);
+ return this;
+ }
+
+ @Override
+ public Void forPath(String path) throws Exception
+ {
+ if ( backgrounding.inBackground() )
+ {
+ client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+ }
+ else
+ {
+ pathInForeground(path);
+ }
+ return null;
+ }
+
+ @Override
+ public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+ {
+ String path = data.getData();
+ String fixedPath = client.fixForNamespace(path);
+ try
+ {
+ final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background");
+ client.getZooKeeper().addPersistentWatch
+ (
+ fixedPath,
+ watching.getWatcher(path),
+ new AsyncCallback.VoidCallback()
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx)
+ {
+ trace.setReturnCode(rc).setWithWatcher(true).setPath(path).commit();
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path, null, ctx, null, null, null, null, null, null);
+ client.processBackgroundOperation(data, event);
+ }
+ },
+ backgrounding.getContext()
+ );
+ }
+ catch ( Throwable e )
+ {
+ backgrounding.checkError(e, watching);
+ }
+ }
+
+ private void pathInForeground(final String path) throws Exception
+ {
+ final String fixedPath = client.fixForNamespace(path);
+ OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground");
+ RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(),
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path));
+ return null;
+ }
+ }
+ );
+ trace.setPath(fixedPath).setWithWatcher(true).commit();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 04113fd..dd995e5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -563,7 +563,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
@Override
public AddPersistentWatchBuilder addPersistentWatch()
{
- return null; // TODO
+ return new AddPersistentWatchBuilderImpl(this);
}
ACLProvider getAclProvider()
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9a6eaa7..b288f2a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
@@ -53,7 +54,10 @@ import java.util.concurrent.atomic.AtomicReference;
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated use {@link CuratorCache}
*/
+@Deprecated
public class NodeCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 91a3a98..c5b9eba 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.framework.EnsureContainers;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableExecutorService;
@@ -61,7 +62,10 @@ import java.util.concurrent.atomic.AtomicReference;
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated use {@link CuratorCache}
*/
+@Deprecated
@SuppressWarnings("NullableProblems")
public class PathChildrenCache implements Closeable
{
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java
deleted file mode 100644
index 0e7a448..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package org.apache.curator.framework.recipes.cache;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.nodes.PersistentWatcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class PersistentWatcherCache implements Closeable
-{
- private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
- private final PersistentWatcher persistentWatcher;
- private final ListenerContainer<PersistentWatcherCacheListener> listeners = new ListenerContainer<>();
- private final ConcurrentMap<String, ChildData> cache = new ConcurrentHashMap<>();
- private final AtomicReference<PersistentWatcherCacheFilter> cacheFilter = new AtomicReference<>(defaultCacheFilter);
-
- private static final PersistentWatcherCacheFilter defaultCacheFilter = new PersistentWatcherCacheFilter()
- {
- @Override
- public Action actionForPath(String path)
- {
- return Action.IGNORE;
- }
- };
- private final CuratorFramework client;
-
- private enum State
- {
- LATENT,
- STARTED,
- CLOSED
- }
-
- private final Watcher watcher = new Watcher()
- {
- @Override
- public void process(final WatchedEvent event)
- {
- processEvent(event);
- }
- };
-
- public PersistentWatcherCache(CuratorFramework client, String basePath)
- {
- this.client = Objects.requireNonNull(client, "client cannot be null");
- persistentWatcher = new PersistentWatcher(client, basePath)
- {
- @Override
- protected void reset()
- {
- super.reset();
- refreshData();
- }
- };
- }
-
- public void start()
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
- persistentWatcher.getListenable().addListener(watcher);
- persistentWatcher.start();
- }
-
- @Override
- public void close()
- {
- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- persistentWatcher.close();
- }
- }
-
- public Listenable<PersistentWatcherCacheListener> getListenable()
- {
- return listeners;
- }
-
- public void setCacheFilter(PersistentWatcherCacheFilter cacheFilter)
- {
- this.cacheFilter.set(Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null"));
- }
-
- /**
- * Return the current data. There are no guarantees of accuracy. This is
- * merely the most recent view of the data.
- *
- * @return list of children and data
- */
- public Map<String, ChildData> getCurrentData()
- {
- return ImmutableMap.copyOf(cache);
- }
-
- /**
- * Return the current data for the given path. There are no guarantees of accuracy. This is
- * merely the most recent view of the data. If there is no child with that path, <code>null</code>
- * is returned.
- *
- * @param fullPath full path to the node to check
- * @return data or null
- */
- public ChildData getCurrentData(String fullPath)
- {
- return cache.get(fullPath);
- }
-
- /**
- * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
- * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
- *
- * @param fullPath the path of the node to clear
- */
- public void clearDataBytes(String fullPath)
- {
- clearDataBytes(fullPath, -1);
- }
-
- /**
- * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
- * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
- *
- * @param fullPath the path of the node to clear
- * @param ifVersion if non-negative, only clear the data if the data's version matches this version
- * @return true if the data was cleared
- */
- public boolean clearDataBytes(String fullPath, int ifVersion)
- {
- ChildData data = cache.get(fullPath);
- if ( data != null )
- {
- if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) )
- {
- if ( data.getData() != null )
- {
- cache.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));
- }
- return true;
- }
- }
- return false;
- }
-
- /**
- * Clears the current data without beginning a new query and without generating any events
- * for listeners.
- */
- public void clear()
- {
- cache.clear();
- }
-
- public void refreshData()
- {
- for ( String path : cache.keySet() )
- {
- PersistentWatcherCacheFilter.Action action = cacheFilter.get().actionForPath(path);
- if ( (action == PersistentWatcherCacheFilter.Action.GET_DATA_THEN_SAVE) || (action == PersistentWatcherCacheFilter.Action.SAVE_THEN_GET_DATA) )
- {
- getData(path);
- }
- }
- }
-
- private void processEvent(final WatchedEvent event)
- {
- switch ( event.getType() )
- {
- default:
- {
- // ignore
- break;
- }
-
- case NodeCreated:
- case NodeDataChanged:
- {
- updateNode(event.getPath());
- break;
- }
-
- case NodeDeleted:
- {
- deleteNode(event.getPath());
- break;
- }
- }
- }
-
- private void deleteNode(final String path)
- {
- if ( cache.remove(path) != null )
- {
- Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>()
- {
- @Override
- public Void apply(PersistentWatcherCacheListener listener)
- {
- listener.nodeDeleted(path);
- return null;
- }
- };
- listeners.forEach(proc);
- }
- }
-
- private void updateNode(final String path)
- {
- boolean putAndCallListeners;
- boolean doGetData;
- switch ( cacheFilter.get().actionForPath(path) )
- {
- default:
- case IGNORE:
- {
- putAndCallListeners = doGetData = false;
- break;
- }
-
- case SAVE_ONLY:
- {
- putAndCallListeners = true;
- doGetData = false;
- break;
- }
-
- case SAVE_THEN_GET_DATA:
- {
- putAndCallListeners = doGetData = true;
- break;
- }
-
- case GET_DATA_THEN_SAVE:
- {
- doGetData = true;
- putAndCallListeners = false;
- break;
- }
- }
-
- if ( putAndCallListeners )
- {
- final ChildData oldData = cache.put(path, new ChildData(path, null, null));
- Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>()
- {
- @Override
- public Void apply(PersistentWatcherCacheListener listener)
- {
- if ( oldData == null )
- {
- listener.nodeCreated(path);
- }
- else
- {
- listener.nodeDataChanged(path);
- }
- return null;
- }
- };
- listeners.forEach(proc);
- }
-
- if ( doGetData )
- {
- getData(path);
- }
- }
-
- private void getData(String path)
- {
- try
- {
- BackgroundCallback callback = new BackgroundCallback()
- {
- @Override
- public void processResult(CuratorFramework client, final CuratorEvent event) throws Exception
- {
- if ( event.getType() == CuratorEventType.GET_DATA )
- {
- ChildData newData = new ChildData(event.getPath(), event.getStat(), event.getData());
- ChildData oldData = cache.put(event.getPath(), newData);
- if ( !newData.equals(oldData) )
- {
- Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>()
- {
- @Override
- public Void apply(PersistentWatcherCacheListener listener)
- {
- listener.nodeDataAvailable(event.getPath());
- return null;
- }
- };
- listeners.forEach(proc);
- }
- }
- }
- };
- client.getData().inBackground().forPath(path);
- }
- catch ( Exception e )
- {
- // TODO
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java
deleted file mode 100644
index b6dc694..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.curator.framework.recipes.cache;
-
-public interface PersistentWatcherCacheFilter
-{
- enum Action
- {
- IGNORE,
- SAVE_ONLY,
- SAVE_THEN_GET_DATA,
- GET_DATA_THEN_SAVE
- }
-
- Action actionForPath(String path);
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java
deleted file mode 100644
index 7a01fd4..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.curator.framework.recipes.cache;
-
-public interface PersistentWatcherCacheListener
-{
- void nodeCreated(String path);
-
- void nodeDeleted(String path);
-
- void nodeDataChanged(String path);
-
- void nodeDataAvailable(String path);
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index ed32223..50e3fd6 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
@@ -67,7 +68,10 @@ import static org.apache.curator.utils.PathUtils.validatePath;
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated use {@link CuratorCache}
*/
+@Deprecated
public class TreeCache implements Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java
deleted file mode 100644
index 9324d55..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.curator.framework.recipes.nodes;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import java.io.Closeable;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class PersistentWatcher implements Closeable
-{
- private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
- private final ListenerContainer<Watcher> listeners = new ListenerContainer<>();
- private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if ( newState.isConnected() )
- {
- reset();
- }
- }
- };
- private final Watcher watcher = new Watcher()
- {
- @Override
- public void process(final WatchedEvent event)
- {
- Function<Watcher, Void> function = new Function<Watcher, Void>()
- {
- @Override
- public Void apply(Watcher watcher)
- {
- watcher.process(event);
- return null;
- }
- };
- listeners.forEach(function);
- }
- };
- private final CuratorFramework client;
- private final String basePath;
-
- private enum State
- {
- LATENT,
- STARTED,
- CLOSED
- }
-
- public PersistentWatcher(CuratorFramework client, String basePath)
- {
- this.client = Objects.requireNonNull(client, "client cannot be null");
- this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
- }
-
- public void start()
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
- client.getConnectionStateListenable().addListener(connectionStateListener);
- reset();
- }
-
- @Override
- public void close()
- {
- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- client.getConnectionStateListenable().removeListener(connectionStateListener);
- try
- {
- client.watches().remove(watcher).inBackground().forPath(basePath);
- }
- catch ( Exception e )
- {
- // TODO
- }
- }
- }
-
- public Listenable<Watcher> getListenable()
- {
- return listeners;
- }
-
- protected void reset()
- {
- try
- {
- client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath);
- }
- catch ( Exception e )
- {
- // TODO
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
new file mode 100644
index 0000000..77508c1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public enum CacheAction
+{
+ IGNORE,
+ DO_NOT_GET_DATA,
+ GET_DATA,
+ GET_COMPRESSED
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java
new file mode 100644
index 0000000..8094df2
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public enum CacheEventType
+{
+ NODE_CREATED,
+ NODE_DELETED,
+ NODE_CHANGED,
+ REFRESHED
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
new file mode 100644
index 0000000..9923174
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public interface CacheFilter
+{
+ CacheAction actionForPath(String path);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java
new file mode 100644
index 0000000..9f36042
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public interface CacheListener
+{
+ void process(CacheEventType eventType, String path);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
new file mode 100644
index 0000000..0f438a2
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.data.Stat;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class CachedNode
+{
+ private final Stat stat;
+ private final byte[] data;
+
+ private static final byte[] defaultData = new byte[0];
+
+ public CachedNode()
+ {
+ this(new Stat(), defaultData);
+ }
+
+ public CachedNode(Stat stat)
+ {
+ this(stat, defaultData);
+ }
+
+ public CachedNode(Stat stat, byte[] data)
+ {
+ this.stat = Objects.requireNonNull(stat, "stat cannot be null");
+ this.data = Objects.requireNonNull(data, "data cannot be null");
+ }
+
+ public Stat getStat()
+ {
+ return stat;
+ }
+
+ public byte[] getData()
+ {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ CachedNode that = (CachedNode)o;
+
+ //noinspection SimplifiableIfStatement
+ if ( !stat.equals(that.stat) )
+ {
+ return false;
+ }
+ return Arrays.equals(data, that.data);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = stat.hashCode();
+ result = 31 * result + Arrays.hashCode(data);
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CachedNode{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
new file mode 100644
index 0000000..131cc2e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * General interface for client-cached nodes. Create instances
+ * using {@link CuratorCacheBuilder}
+ */
+public interface CuratorCache extends Closeable
+{
+ /**
+ * Start the cache
+ */
+ void start();
+
+ @Override
+ void close();
+
+ /**
+ * Get listenable container used to add/remove listeners
+ *
+ * @return listener container
+ */
+ Listenable<CacheListener> getListenable();
+
+ /**
+ * Refresh all cached nodes and send {@link CacheEventType#REFRESHED} when completed
+ */
+ void refreshAll();
+
+ /**
+ * Refresh the given cached node
+ *
+ * @param path node full path
+ */
+ void refresh(String path);
+
+ /**
+ * Remove the given path from the cache.
+ *
+ * @param path node full path
+ * @return true if the node was in the cache
+ */
+ boolean clear(String path);
+
+ /**
+ * Remove all nodes from the cache
+ */
+ void clearAll();
+
+ /**
+ * Return true if there is a cached node at the given path
+ *
+ * @param path node full path
+ * @return true/false
+ */
+ boolean exists(String path);
+
+ /**
+ * Returns the set of paths in the cache. The returned set behaves in the same manner
+ * as {@link ConcurrentHashMap#keySet()}
+ *
+ * @return set of paths
+ */
+ Set<String> paths();
+
+ /**
+ * Returns the collection of node values in the cache. The returned set behaves in the same manner
+ * as {@link ConcurrentHashMap#values()}
+ *
+ * @return node values
+ */
+ Collection<CachedNode> nodes();
+
+ /**
+ * Returns the collection of node entries in the cache. The returned set behaves in the same manner
+ * as {@link ConcurrentHashMap#entrySet()}
+ *
+ * @return node entries
+ */
+ Set<Map.Entry<String, CachedNode>> entries();
+
+ /**
+ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+ * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ *
+ * @param path the path of the node to clear
+ */
+ void clearDataBytes(String path);
+
+ /**
+ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+ * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ *
+ * @param path the path of the node to clear
+ * @param ifVersion if non-negative, only clear the data if the data's version matches this version
+ * @return true if the data was cleared
+ */
+ boolean clearDataBytes(String path, int ifVersion);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
new file mode 100644
index 0000000..0affa18
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.cache.Cache;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+abstract class CuratorCacheBase implements CuratorCache
+{
+ protected final Cache<String, CachedNode> cache;
+
+ protected CuratorCacheBase(Cache<String, CachedNode> cache)
+ {
+ this.cache = Objects.requireNonNull(cache, "cache cannot be null");
+ }
+
+ @Override
+ public final boolean clear(String path)
+ {
+ return cache.asMap().remove(path) != null;
+ }
+
+ @Override
+ public final void clearAll()
+ {
+ cache.invalidateAll();
+ }
+
+ @Override
+ public final boolean exists(String path)
+ {
+ return cache.asMap().containsKey(path);
+ }
+
+ @Override
+ public final Set<String> paths()
+ {
+ return cache.asMap().keySet();
+ }
+
+ @Override
+ public final Collection<CachedNode> nodes()
+ {
+ return cache.asMap().values();
+ }
+
+ @Override
+ public final Set<Map.Entry<String, CachedNode>> entries()
+ {
+ return cache.asMap().entrySet();
+ }
+
+ /**
+ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+ * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ *
+ * @param path the path of the node to clear
+ */
+ @Override
+ public final void clearDataBytes(String path)
+ {
+ clearDataBytes(path, -1);
+ }
+
+ /**
+ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+ * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ *
+ * @param path the path of the node to clear
+ * @param ifVersion if non-negative, only clear the data if the data's version matches this version
+ * @return true if the data was cleared
+ */
+ @Override
+ public final boolean clearDataBytes(String path, int ifVersion)
+ {
+ CachedNode data = cache.asMap().get(path);
+ if ( data != null )
+ {
+ if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) )
+ {
+ return cache.asMap().replace(path, data, new CachedNode(data.getStat()));
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
new file mode 100644
index 0000000..8ee707b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.cache.CacheBuilder;
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class CuratorCacheBuilder
+{
+ private CacheFilter cacheFilter = new CacheFilter()
+ {
+ @Override
+ public CacheAction actionForPath(String path)
+ {
+ return CacheAction.DO_NOT_GET_DATA;
+ }
+ };
+ private CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
+ private String path;
+ private boolean singleNode;
+
+ public static CuratorCacheBuilder forPath(String path)
+ {
+ CuratorCacheBuilder builder = new CuratorCacheBuilder();
+ builder.path = Objects.requireNonNull(path, "path cannot be null");
+ builder.singleNode = false;
+ return builder;
+ }
+
+ public static CuratorCacheBuilder forNode(String path)
+ {
+ CuratorCacheBuilder builder = new CuratorCacheBuilder();
+ builder.path = Objects.requireNonNull(path, "path cannot be null");
+ builder.singleNode = true;
+ return builder;
+ }
+
+ public CuratorCache build(CuratorFramework client)
+ {
+ if ( singleNode )
+ {
+ return new InternalNodeCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build());
+ }
+ return new InternalCuratorCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build());
+ }
+
+ public CuratorCacheBuilder usingWeakValues()
+ {
+ cacheBuilder = cacheBuilder.weakValues();
+ return this;
+ }
+
+ public CuratorCacheBuilder usingSoftValues()
+ {
+ cacheBuilder = cacheBuilder.softValues();
+ return this;
+ }
+
+ public CuratorCacheBuilder thatExpiresAfterWrite(long duration, TimeUnit unit)
+ {
+ cacheBuilder = cacheBuilder.expireAfterWrite(duration, unit);
+ return this;
+ }
+
+ public CuratorCacheBuilder thatExpiresAfterAccess(long duration, TimeUnit unit)
+ {
+ cacheBuilder = cacheBuilder.expireAfterAccess(duration, unit);
+ return this;
+ }
+
+ public CuratorCacheBuilder withCacheFilter(CacheFilter cacheFilter)
+ {
+ this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
+ return this;
+ }
+
+ private CuratorCacheBuilder()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
new file mode 100644
index 0000000..08006a1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class InternalCuratorCache extends CuratorCacheBase implements Watcher
+{
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final PersistentWatcher watcher;
+ private final CuratorFramework client;
+ private final String basePath;
+ private final CacheFilter cacheFilter;
+ private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+ private static final CachedNode nullNode = new CachedNode();
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, Cache<String, CachedNode> cache)
+ {
+ super(cache);
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ basePath = path;
+ this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
+ watcher = new PersistentWatcher(client, path)
+ {
+ @Override
+ protected void watcherSet()
+ {
+ refreshAll();
+ }
+ };
+ watcher.getListenable().addListener(this);
+ }
+
+ @Override
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
+ watcher.start();
+ }
+
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ watcher.getListenable().removeListener(this);
+ listeners.clear();
+ watcher.close();
+ }
+ }
+
+ @Override
+ public Listenable<CacheListener> getListenable()
+ {
+ return listeners;
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ switch ( event.getType() )
+ {
+ default:
+ {
+ // NOP
+ break;
+ }
+
+ case NodeDeleted:
+ {
+ if ( cache.asMap().remove(event.getPath()) != null )
+ {
+ notifyListeners(CacheEventType.NODE_DELETED, event.getPath());
+ }
+ break;
+ }
+
+ case NodeCreated:
+ case NodeDataChanged:
+ {
+ refresh(event.getPath());
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void refreshAll()
+ {
+ Set<String> keySet = new HashSet<>(cache.asMap().keySet());
+ AtomicInteger counter = new AtomicInteger(keySet.size());
+ for ( String path : keySet )
+ {
+ internalRefresh(path, counter);
+ }
+ }
+
+ @Override
+ public void refresh(String path)
+ {
+ internalRefresh(path, null);
+ }
+
+ private void internalRefresh(final String path, final AtomicInteger counter)
+ {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getType() == CuratorEventType.GET_DATA )
+ {
+ CachedNode newNode = new CachedNode(event.getStat(), event.getData());
+ CachedNode oldNode = cache.asMap().put(path, newNode);
+ if ( oldNode == null )
+ {
+ notifyListeners(CacheEventType.NODE_CREATED, path);
+ }
+ else if ( !newNode.equals(oldNode) )
+ {
+ notifyListeners(CacheEventType.NODE_CHANGED, path);
+ }
+ }
+
+ if ( counter.decrementAndGet() <= 0 )
+ {
+ notifyListeners(CacheEventType.REFRESHED, basePath);
+ }
+ }
+ };
+
+ switch ( cacheFilter.actionForPath(path) )
+ {
+ case IGNORE:
+ {
+ // NOP
+ break;
+ }
+
+ case DO_NOT_GET_DATA:
+ {
+ if ( cache.asMap().put(path, nullNode) == null )
+ {
+ notifyListeners(CacheEventType.NODE_CREATED, path);
+ }
+ break;
+ }
+
+ case GET_DATA:
+ {
+ try
+ {
+ client.getData().inBackground().forPath(path);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ // TODO
+ }
+ break;
+ }
+
+ case GET_COMPRESSED:
+ {
+ try
+ {
+ client.getData().decompressed().inBackground().forPath(path);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ // TODO
+ }
+ break;
+ }
+ }
+ }
+
+ private void notifyListeners(final CacheEventType eventType, final String path)
+ {
+ Function<CacheListener, Void> proc = new Function<CacheListener, Void>()
+ {
+ @Override
+ public Void apply(CacheListener listener)
+ {
+ listener.process(eventType, path);
+ return null;
+ }
+ };
+ listeners.forEach(proc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
new file mode 100644
index 0000000..12571de
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+class InternalNodeCache extends CuratorCacheBase
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final WatcherRemoveCuratorFramework client;
+ private final String path;
+ private final CacheFilter cacheFilter;
+ private final AtomicReference<CachedNode> data = new AtomicReference<>(null);
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+ private final AtomicBoolean isConnected = new AtomicBoolean(true);
+ private final AtomicBoolean resetEventNeeded = new AtomicBoolean(true);
+ private static final CachedNode nullNode = new CachedNode();
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+ {
+ if ( isConnected.compareAndSet(false, true) )
+ {
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.error("Trying to reset after reconnection", e);
+ }
+ }
+ }
+ else
+ {
+ isConnected.set(false);
+ }
+ }
+ };
+
+ private Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ // TODO
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ private final BackgroundCallback backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ try
+ {
+ processBackgroundResult(event);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ // TODO
+ }
+
+ if ( resetEventNeeded.compareAndSet(true, false) )
+ {
+ notifyListeners(CacheEventType.REFRESHED);
+ }
+ }
+ };
+
+ InternalNodeCache(CuratorFramework client, String path, CacheFilter cacheFilter, Cache<String, CachedNode> cache)
+ {
+ super(cache);
+ this.client = client.newWatcherRemoveCuratorFramework();
+ this.path = PathUtils.validatePath(path);
+ this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
+ }
+
+ @Override
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
+
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ refreshAll();
+ }
+
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ client.removeWatchers();
+ listeners.clear();
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ }
+ }
+
+ @Override
+ public Listenable<CacheListener> getListenable()
+ {
+ return listeners;
+ }
+
+ @Override
+ public void refreshAll()
+ {
+ try
+ {
+ resetEventNeeded.set(true);
+ reset();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ // TODO
+ }
+ }
+
+ @Override
+ public void refresh(String path)
+ {
+ Preconditions.checkArgument(this.path.equals(path), "Bad path: " + path);
+ refreshAll();
+ }
+
+ private void reset() throws Exception
+ {
+ if ( (state.get() == State.STARTED) && isConnected.get() )
+ {
+ client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
+ }
+ }
+
+ private void processBackgroundResult(CuratorEvent event) throws Exception
+ {
+ switch ( event.getType() )
+ {
+ case GET_DATA:
+ {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ CachedNode cachedNode = new CachedNode(event.getStat(), event.getData());
+ setNewData(cachedNode);
+ }
+ break;
+ }
+
+ case EXISTS:
+ {
+ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+ {
+ setNewData(null);
+ }
+ else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ switch ( cacheFilter.actionForPath(path) )
+ {
+ default:
+ case IGNORE:
+ {
+ throw new UnsupportedOperationException("Single node cache does not support action: IGNORE");
+ }
+
+ case DO_NOT_GET_DATA:
+ {
+ setNewData(nullNode);
+ break;
+ }
+
+ case GET_DATA:
+ {
+ client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
+ break;
+ }
+
+ case GET_COMPRESSED:
+ {
+ client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
+ break;
+ }
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ volatile Exchanger<Object> rebuildTestExchanger;
+ private void setNewData(CachedNode newData) throws InterruptedException
+ {
+ CachedNode previousData = data.getAndSet(newData);
+ if ( newData == null )
+ {
+ notifyListeners(CacheEventType.NODE_DELETED);
+ }
+ else if ( previousData == null )
+ {
+ notifyListeners(CacheEventType.NODE_CREATED);
+ }
+ else if ( !previousData.equals(newData) )
+ {
+ notifyListeners(CacheEventType.NODE_CHANGED);
+ }
+
+ if ( rebuildTestExchanger != null )
+ {
+ try
+ {
+ rebuildTestExchanger.exchange(new Object());
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void notifyListeners(final CacheEventType event)
+ {
+ listeners.forEach
+ (
+ new Function<CacheListener, Void>()
+ {
+ @Override
+ public Void apply(CacheListener listener)
+ {
+ try
+ {
+ listener.process(event, path);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.error("Calling listener", e);
+ }
+ return null;
+ }
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..2f136a6
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PersistentWatcher implements Closeable
+{
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final ListenerContainer<Watcher> listeners = new ListenerContainer<>();
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState.isConnected() )
+ {
+ reset();
+ }
+ }
+ };
+ private final Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(final WatchedEvent event)
+ {
+ Function<Watcher, Void> function = new Function<Watcher, Void>()
+ {
+ @Override
+ public Void apply(Watcher watcher)
+ {
+ watcher.process(event);
+ return null;
+ }
+ };
+ listeners.forEach(function);
+ }
+ };
+ private final CuratorFramework client;
+ private final String basePath;
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ public PersistentWatcher(CuratorFramework client, String basePath)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+ }
+
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ reset();
+ }
+
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ try
+ {
+ client.watches().remove(watcher).inBackground().forPath(basePath);
+ }
+ catch ( Exception e )
+ {
+ // TODO
+ }
+ }
+ }
+
+ public Listenable<Watcher> getListenable()
+ {
+ return listeners;
+ }
+
+ private void reset()
+ {
+ try
+ {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == 0 )
+ {
+ watcherSet();
+ }
+ }
+ };
+ client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath);
+ }
+ catch ( Exception e )
+ {
+ // TODO
+ }
+ }
+
+ protected void watcherSet()
+ {
+ // default is NOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
new file mode 100644
index 0000000..1fbe255
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public class SingleLevelCacheFilter implements CacheFilter
+{
+ private final String levelPath;
+ private final CacheAction defaultAction;
+
+ public SingleLevelCacheFilter(String levelPath)
+ {
+ this(levelPath, CacheAction.GET_DATA);
+ }
+
+ public SingleLevelCacheFilter(String levelPath, CacheAction defaultAction)
+ {
+ this.levelPath = levelPath;
+ this.defaultAction = defaultAction;
+ }
+
+ @Override
+ public CacheAction actionForPath(String path)
+ {
+ if ( levelPath.equals(path) )
+ {
+ return actionForMatchedPath();
+ }
+ return CacheAction.IGNORE;
+ }
+
+ protected CacheAction actionForMatchedPath()
+ {
+ return defaultAction;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
new file mode 100644
index 0000000..d3c7fac
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+public class StatsOnlyCacheFilter implements CacheFilter
+{
+ @Override
+ public CacheAction actionForPath(String path)
+ {
+ return CacheAction.DO_NOT_GET_DATA;
+ }
+}