You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/08/03 17:20:07 UTC
curator git commit: CachedModeledFramework can use the new
CuratorCache
Repository: curator
Updated Branches:
refs/heads/persistent-watch 2bbdbfd5f -> fc2219ea9
CachedModeledFramework can use the new CuratorCache
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fc2219ea
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fc2219ea
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fc2219ea
Branch: refs/heads/persistent-watch
Commit: fc2219ea9dc365cf0226593f9260361407a4ef6f
Parents: 2bbdbfd
Author: randgalt <ra...@apache.org>
Authored: Thu Aug 3 12:20:02 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Aug 3 12:20:02 2017 -0500
----------------------------------------------------------------------
.../framework/recipes/watch/CacheAction.java | 5 ++
.../framework/recipes/watch/CacheSelectors.java | 13 +++++
.../recipes/watch/CuratorCacheBuilder.java | 1 +
.../recipes/watch/InternalCuratorCache.java | 2 +
.../async/modeled/details/ModeledCacheImpl.java | 56 ++++++++++----------
5 files changed, 50 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
index 57a48f1..8cfca93 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
@@ -38,6 +38,11 @@ public enum CacheAction
PATH_ONLY,
/**
+ * The node and its {@link Stat} are stored - in events, however, uncompressed data is sent (but not stored)
+ */
+ UNCOMPRESSED_STAT_ONLY,
+
+ /**
* The node and its {@link Stat} are stored
*/
STAT_ONLY,
http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
index db1a8a8..eaf1145 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
@@ -35,6 +35,7 @@ public class CacheSelectors
{
private static final CacheSelector statAndData = new StandardCacheSelector(CacheAction.STAT_AND_DATA);
private static final CacheSelector uncompressedStatAndData = new StandardCacheSelector(CacheAction.STAT_AND_UNCOMPRESSED_DATA);
+ private static final CacheSelector uncompressedStatOnly = new StandardCacheSelector(CacheAction.UNCOMPRESSED_STAT_ONLY);
private static final CacheSelector statOnly = new StandardCacheSelector(CacheAction.STAT_ONLY);
private static final CacheSelector pathOnly = new StandardCacheSelector(CacheAction.PATH_ONLY);
@@ -156,6 +157,18 @@ public class CacheSelectors
/**
* Returns a cache selector that stores only the stat and processes the entire tree
+ * from the root path given to the cache builder. In events, however, uncompressed data is
+ * sent (but not stored).
+ *
+ * @return selector
+ */
+ public static CacheSelector getUncompressedStatOnly()
+ {
+ return uncompressedStatOnly;
+ }
+
+ /**
+ * Returns a cache selector that stores only the stat and processes the entire tree
* from the root path given to the cache builder
*
* @return selector
http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index 19c27a9..10f8bc2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -65,6 +65,7 @@ public class CuratorCacheBuilder
if ( singleNodeCacheAction != null )
{
Preconditions.checkState(cacheSelector == null, "Single node mode does not support CacheSelectors");
+ Preconditions.checkState(singleNodeCacheAction != CacheAction.UNCOMPRESSED_STAT_ONLY, "Single node mode does not support UNCOMPRESSED_STAT_ONLY");
return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 9d94c34..e2b1bf3 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -231,6 +231,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
case STAT_AND_UNCOMPRESSED_DATA:
+ case UNCOMPRESSED_STAT_ONLY:
{
try
{
@@ -289,6 +290,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
case STAT_ONLY:
+ case UNCOMPRESSED_STAT_ONLY:
{
putNode = new CachedNode(newNode.getStat());
break;
http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 72e6762..ce73a9b 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -21,17 +21,20 @@ package org.apache.curator.x.async.modeled.details;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.watch.CacheEvent;
+import org.apache.curator.framework.recipes.watch.CacheListener;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.ModeledCache;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
import org.apache.zookeeper.data.Stat;
import java.util.AbstractMap;
import java.util.Map;
@@ -40,9 +43,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
-class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
+class ModeledCacheImpl<T> implements CacheListener, ModeledCache<T>
{
- private final TreeCache cache;
+ private final CuratorCache cache;
private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
private final ModelSerializer<T> serializer;
private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
@@ -69,11 +72,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
basePath = modelSpec.path();
this.serializer = modelSpec.serializer();
- cache = TreeCache.newBuilder(client, basePath.fullPath())
- .setCacheData(false)
- .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
- .setExecutor(executor)
- .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
+ boolean dataIsCompressed = modelSpec.createOptions().contains(CreateOption.compress);
+ cache = CuratorCacheBuilder.builder(client, basePath.fullPath())
+ .withCacheSelector(dataIsCompressed ? CacheSelectors.uncompressedStatAndData() : CacheSelectors.getUncompressedStatOnly())
+ .sendingRefreshEvents(true)
.build();
}
@@ -134,11 +136,11 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
}
@Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event)
+ public void process(CacheEvent event, String path, CachedNode affectedNode)
{
try
{
- internalChildEvent(event);
+ internalChildEvent(event, path, affectedNode);
}
catch ( Exception e )
{
@@ -151,42 +153,42 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
}
}
- private void internalChildEvent(TreeCacheEvent event) throws Exception
+ private void internalChildEvent(CacheEvent event, String pathStr, CachedNode affectedNode) throws Exception
{
- switch ( event.getType() )
+ switch ( event )
{
- case NODE_ADDED:
- case NODE_UPDATED:
+ case NODE_CREATED:
+ case NODE_CHANGED:
{
- ZPath path = ZPath.parse(event.getData().getPath());
+ ZPath path = ZPath.parse(pathStr);
if ( !path.equals(basePath) )
{
- byte[] bytes = event.getData().getData();
+ byte[] bytes = affectedNode.getData();
if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created
{
T model = serializer.deserialize(bytes);
- entries.put(path, new Entry<>(event.getData().getStat(), model));
- ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
- accept(type, path, event.getData().getStat(), model);
+ entries.put(path, new Entry<>(affectedNode.getStat(), model));
+ ModeledCacheListener.Type type = (event == CacheEvent.NODE_CREATED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+ accept(type, path, affectedNode.getStat(), model);
}
}
break;
}
- case NODE_REMOVED:
+ case NODE_DELETED:
{
- ZPath path = ZPath.parse(event.getData().getPath());
+ ZPath path = ZPath.parse(pathStr);
if ( !path.equals(basePath) )
{
Entry<T> entry = entries.remove(path);
- T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
- Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+ T model = (entry != null) ? entry.model : serializer.deserialize(affectedNode.getData());
+ Stat stat = (entry != null) ? entry.stat : affectedNode.getStat();
accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
}
break;
}
- case INITIALIZED:
+ case CACHE_REFRESHED:
{
listenerContainer.forEach(l -> {
l.initialized();