You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/08/03 17:20:07 UTC

curator git commit: CachedModeledFramework can use the new CuratorCache

Repository: curator
Updated Branches:
  refs/heads/persistent-watch 2bbdbfd5f -> fc2219ea9


CachedModeledFramework can use the new CuratorCache


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fc2219ea
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fc2219ea
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fc2219ea

Branch: refs/heads/persistent-watch
Commit: fc2219ea9dc365cf0226593f9260361407a4ef6f
Parents: 2bbdbfd
Author: randgalt <ra...@apache.org>
Authored: Thu Aug 3 12:20:02 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Aug 3 12:20:02 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/watch/CacheAction.java    |  5 ++
 .../framework/recipes/watch/CacheSelectors.java | 13 +++++
 .../recipes/watch/CuratorCacheBuilder.java      |  1 +
 .../recipes/watch/InternalCuratorCache.java     |  2 +
 .../async/modeled/details/ModeledCacheImpl.java | 56 ++++++++++----------
 5 files changed, 50 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
index 57a48f1..8cfca93 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
@@ -38,6 +38,11 @@ public enum CacheAction
     PATH_ONLY,
 
     /**
+     * The node and its {@link Stat} are stored - in events, however, uncompressed data is sent (but not stored)
+     */
+    UNCOMPRESSED_STAT_ONLY,
+
+    /**
      * The node and its {@link Stat} are stored
      */
     STAT_ONLY,

http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
index db1a8a8..eaf1145 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
@@ -35,6 +35,7 @@ public class CacheSelectors
 {
     private static final CacheSelector statAndData = new StandardCacheSelector(CacheAction.STAT_AND_DATA);
     private static final CacheSelector uncompressedStatAndData = new StandardCacheSelector(CacheAction.STAT_AND_UNCOMPRESSED_DATA);
+    private static final CacheSelector uncompressedStatOnly = new StandardCacheSelector(CacheAction.UNCOMPRESSED_STAT_ONLY);
     private static final CacheSelector statOnly = new StandardCacheSelector(CacheAction.STAT_ONLY);
     private static final CacheSelector pathOnly = new StandardCacheSelector(CacheAction.PATH_ONLY);
 
@@ -156,6 +157,18 @@ public class CacheSelectors
 
     /**
      * Returns a cache selector that stores only the stat and processes the entire tree
+     * from the root path given to the cache builder. In events, however, uncompressed data is
+     * sent (but not stored).
+     *
+     * @return selector
+     */
+    public static CacheSelector getUncompressedStatOnly()
+    {
+        return uncompressedStatOnly;
+    }
+
+    /**
+     * Returns a cache selector that stores only the stat and processes the entire tree
      * from the root path given to the cache builder
      *
      * @return selector

http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index 19c27a9..10f8bc2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -65,6 +65,7 @@ public class CuratorCacheBuilder
         if ( singleNodeCacheAction != null )
         {
             Preconditions.checkState(cacheSelector == null, "Single node mode does not support CacheSelectors");
+            Preconditions.checkState(singleNodeCacheAction != CacheAction.UNCOMPRESSED_STAT_ONLY, "Single node mode does not support UNCOMPRESSED_STAT_ONLY");
             return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart);
         }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 9d94c34..e2b1bf3 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -231,6 +231,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             }
 
             case STAT_AND_UNCOMPRESSED_DATA:
+            case UNCOMPRESSED_STAT_ONLY:
             {
                 try
                 {
@@ -289,6 +290,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             }
 
             case STAT_ONLY:
+            case UNCOMPRESSED_STAT_ONLY:
             {
                 putNode = new CachedNode(newNode.getStat());
                 break;

http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 72e6762..ce73a9b 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -21,17 +21,20 @@ package org.apache.curator.x.async.modeled.details;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.watch.CacheEvent;
+import org.apache.curator.framework.recipes.watch.CacheListener;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
@@ -40,9 +43,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
-class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
+class ModeledCacheImpl<T> implements CacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCache cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
@@ -69,11 +72,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
+        boolean dataIsCompressed = modelSpec.createOptions().contains(CreateOption.compress);
+        cache = CuratorCacheBuilder.builder(client, basePath.fullPath())
+            .withCacheSelector(dataIsCompressed ? CacheSelectors.uncompressedStatAndData() : CacheSelectors.getUncompressedStatOnly())
+            .sendingRefreshEvents(true)
             .build();
     }
 
@@ -134,11 +136,11 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
     }
 
     @Override
-    public void childEvent(CuratorFramework client, TreeCacheEvent event)
+    public void process(CacheEvent event, String path, CachedNode affectedNode)
     {
         try
         {
-            internalChildEvent(event);
+            internalChildEvent(event, path, affectedNode);
         }
         catch ( Exception e )
         {
@@ -151,42 +153,42 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         }
     }
 
-    private void internalChildEvent(TreeCacheEvent event) throws Exception
+    private void internalChildEvent(CacheEvent event, String pathStr, CachedNode affectedNode) throws Exception
     {
-        switch ( event.getType() )
+        switch ( event )
         {
-        case NODE_ADDED:
-        case NODE_UPDATED:
+        case NODE_CREATED:
+        case NODE_CHANGED:
         {
-            ZPath path = ZPath.parse(event.getData().getPath());
+            ZPath path = ZPath.parse(pathStr);
             if ( !path.equals(basePath) )
             {
-                byte[] bytes = event.getData().getData();
+                byte[] bytes = affectedNode.getData();
                 if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
                 {
                     T model = serializer.deserialize(bytes);
-                    entries.put(path, new Entry<>(event.getData().getStat(), model));
-                    ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
-                    accept(type, path, event.getData().getStat(), model);
+                    entries.put(path, new Entry<>(affectedNode.getStat(), model));
+                    ModeledCacheListener.Type type = (event == CacheEvent.NODE_CREATED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+                    accept(type, path, affectedNode.getStat(), model);
                 }
             }
             break;
         }
 
-        case NODE_REMOVED:
+        case NODE_DELETED:
         {
-            ZPath path = ZPath.parse(event.getData().getPath());
+            ZPath path = ZPath.parse(pathStr);
             if ( !path.equals(basePath) )
             {
                 Entry<T> entry = entries.remove(path);
-                T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
-                Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+                T model = (entry != null) ? entry.model : serializer.deserialize(affectedNode.getData());
+                Stat stat = (entry != null) ? entry.stat : affectedNode.getStat();
                 accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
             }
             break;
         }
 
-        case INITIALIZED:
+        case CACHE_REFRESHED:
         {
             listenerContainer.forEach(l -> {
                 l.initialized();