You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 02:13:31 UTC

[curator] 02/02: Created new replacement cache recipe, CuratorCache. Replaces TreeCache, NodeCache and PathChildrenCache when Persistent Watchers are available.

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-cache-zk35
in repository https://gitbox.apache.org/repos/asf/curator.git

commit fab42ae7412640f75cf2b3cb69894faf46e69648
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 7 15:48:57 2019 +0300

    Created new replacement cache recipe, CuratorCache. Replaces TreeCache, NodeCache and PathChildrenCache when Persistent Watchers are available.
---
 .../recipes/cache/CuratorCacheBuilderImpl.java     |   7 +-
 .../recipes/cache/CuratorCacheStorage.java         |   1 -
 .../recipes/cache/CuratorTreeCacheBridge.java      | 129 +++++++++++++++++++++
 3 files changed, 135 insertions(+), 2 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index d47d779..1b5cd9c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
@@ -68,6 +69,10 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
     @Override
     public CuratorCache build()
     {
-        return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+        if ( Compatibility.hasPersistentWatches() )
+        {
+            return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+        }
+        return new CuratorTreeCacheBridge(client, path, options, executor, exceptionHandler);
     }
 }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index 34b187f..427c139 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -21,7 +21,6 @@ package org.apache.curator.framework.recipes.cache;
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java
new file mode 100644
index 0000000..dc15767
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java
@@ -0,0 +1,129 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+class CuratorTreeCacheBridge implements CuratorCache, CuratorCacheStorage
+{
+    private final TreeCache treeCache;
+    private final String path;
+
+    CuratorTreeCacheBridge(CuratorFramework client, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler)
+    {
+        Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        this.path = path;
+        TreeCache.Builder builder = TreeCache.newBuilder(client, path);
+        if ( options.contains(Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(Options.COMPRESSED_DATA) )
+        {
+             builder.setDataIsCompressed(true);
+        }
+        if ( executor != null )
+        {
+            //builder = builder.setExecutor()
+        }
+        treeCache = builder.build();
+        if ( exceptionHandler != null )
+        {
+            treeCache.getUnhandledErrorListenable().addListener((message, e) -> {
+                if ( e instanceof Exception )
+                {
+                    exceptionHandler.accept((Exception)e);
+                }
+                else
+                {
+                    exceptionHandler.accept(new RuntimeException(e));
+                }
+            });
+        }
+        treeCache.getListenable().addListener((__, event) -> callListeners(event));
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            treeCache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        treeCache.close();
+    }
+
+    @Override
+    public CuratorCacheStorage storage()
+    {
+        return this;
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return null;
+    }
+
+    @Override
+    public Optional<ChildData> put(ChildData data)
+    {
+        throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support put()");
+    }
+
+    @Override
+    public Optional<ChildData> remove(String path)
+    {
+        throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support remove()");
+    }
+
+    @Override
+    public Optional<ChildData> get(String path)
+    {
+        return Optional.ofNullable(treeCache.getCurrentData(path));
+    }
+
+    @Override
+    public int size()
+    {
+        throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support size()");
+    }
+
+    @Override
+    public Stream<ChildData> stream()
+    {
+        throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support stream()");
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return treeCache.getCurrentChildren(path).values().stream();
+    }
+
+    @Override
+    public void clear()
+    {
+        throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support clear()");
+    }
+
+    private void callListeners(TreeCacheEvent event)
+    {
+        // TODO
+    }
+}