You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 01:40:14 UTC

[curator] 01/01: wip

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 492eac6b397f9fb2d4ffd7148ef2b45234b548c4
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Oct 24 22:05:25 2019 +0300

    wip
---
 .../framework/recipes/cache/CacheBridge.java       | 39 ++++++++++
 .../recipes/cache/CuratorCacheBridge.java          | 63 ++++++++++++++++
 .../framework/recipes/cache/NodeCacheBridge.java   | 82 ++++++++++++++++++++
 .../recipes/cache/PathChildrenCacheBridge.java     | 88 ++++++++++++++++++++++
 .../framework/recipes/cache/TreeCacheBridge.java   | 78 +++++++++++++++++++
 .../details/CachedModeledFrameworkImpl.java        | 11 +--
 .../x/async/modeled/details/ModeledCacheImpl.java  | 14 +++-
 7 files changed, 364 insertions(+), 11 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java
new file mode 100644
index 0000000..b56f873
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java
@@ -0,0 +1,39 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public interface CacheBridge extends Closeable
+{
+    void addListener(TreeCacheListener listener);
+
+    void addListener(NodeCacheListener listener);
+
+    void addListener(PathChildrenCacheListener listener);
+
+    void start();
+
+    Optional<ChildData> currentData();
+
+    Optional<ChildData> currentData(String fullPath);
+
+    @Override
+    void close();
+
+    static CacheBridge newPathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData)
+    {
+        return PathChildrenCacheBridge.build(client, path, cacheData);
+    }
+
+    static CacheBridge newTreeCacheBridge(CuratorFramework client, String path, boolean cacheData, boolean compressedData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor)
+    {
+        return TreeCacheBridge.build(client, path, cacheData, compressedData, createParentsIfNeeded, createParentsAsContainers, executor);
+    }
+
+    static CacheBridge newNodeCacheBridge(CuratorFramework client, String path)
+    {
+        return NodeCacheBridge.build(client, path);
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..ddf663d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,63 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Optional;
+
+class CuratorCacheBridge implements CacheBridge
+{
+    private final CuratorCache cache;
+    private final CuratorFramework client;
+
+    CuratorCacheBridge(CuratorFramework client, CuratorCacheStorage storage, String path, CuratorCache.Options... options)
+    {
+        this.client = client;
+        cache = CuratorCache.build(client, storage, path, options);
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client, listener).build());
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        cache.listenable().addListener(CuratorCacheListener.builder().forNodeCache(listener).build());
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        cache.listenable().addListener(CuratorCacheListener.builder().forPathChildrenCache(client, listener).build());
+    }
+
+    @Override
+    public void start()
+    {
+        cache.start();
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        return cache.getRootData();
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        return cache.storage().get(fullPath);
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    CuratorCache cache()
+    {
+        return cache;
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java
new file mode 100644
index 0000000..b4debd3
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java
@@ -0,0 +1,82 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import java.io.IOException;
+import java.util.Optional;
+
+@SuppressWarnings("deprecation")
+class NodeCacheBridge implements CacheBridge
+{
+    private final NodeCache cache;
+
+    static CacheBridge build(CuratorFramework client, String path)
+    {
+        if ( Compatibility.hasPersistentWatches() )
+        {
+            return new CuratorCacheBridge(client, CuratorCacheStorage.standard(), path);
+        }
+        return new NodeCacheBridge(client, path);
+    }
+
+    private NodeCacheBridge(CuratorFramework client, String path)
+    {
+        cache = new NodeCache(client, path);
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        cache.getListenable().addListener(listener);
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        return Optional.ofNullable(cache.getCurrentData());
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            cache.close();
+        }
+        catch ( IOException e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java
new file mode 100644
index 0000000..cf1b4e0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java
@@ -0,0 +1,88 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import org.apache.curator.utils.ZKPaths;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@SuppressWarnings("deprecation")
+class PathChildrenCacheBridge implements CacheBridge
+{
+    private final PathChildrenCache cache;
+
+    static CacheBridge build(CuratorFramework client, String path, boolean cacheData)
+    {
+        if ( Compatibility.hasPersistentWatches() )
+        {
+            CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+            CuratorCacheBridge curatorCache = new CuratorCacheBridge(client, storage, path, RECURSIVE);
+            curatorCache.cache().setPathFilter(filtering -> ZKPaths.getPathAndNode(filtering).getPath().equals(path));  // mimic PathChildrenCache which only caches the children of the main path
+            return curatorCache;
+        }
+        return new PathChildrenCacheBridge(client, path, cacheData);
+    }
+
+    private PathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData)
+    {
+        cache = new PathChildrenCache(client, path, cacheData);
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        cache.getListenable().addListener(listener);
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        return Optional.ofNullable(cache.getCurrentData(fullPath));
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            cache.close();
+        }
+        catch ( IOException e )
+        {
+            throw new RuntimeException();
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
new file mode 100644
index 0000000..1b173b8
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -0,0 +1,78 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@SuppressWarnings("deprecation")
+class TreeCacheBridge implements CacheBridge
+{
+    private final TreeCache cache;
+
+    static CacheBridge build(CuratorFramework client, String path, boolean cacheData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor)
+    {
+        if ( Compatibility.hasPersistentWatches() )
+        {
+            CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+            return new CuratorCacheBridge(client, storage, path, RECURSIVE);
+        }
+        return new TreeCacheBridge(client, path, cacheData);
+    }
+
+    private TreeCacheBridge(CuratorFramework client, String path, boolean cacheData)
+    {
+        cache = TreeCache.newBuilder(client, path).setCacheData(cacheData).build();
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        cache.getListenable().addListener(listener);
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        return Optional.ofNullable(cache.getCurrentData(fullPath));
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index c897b4e..2dd5625 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
 {
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
-    private final Executor executor;
 
     CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
     {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor));
     }
 
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
     {
         this.client = client;
         this.cache = cache;
-        this.executor = executor;
     }
 
     @Override
@@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> child(Object child)
     {
-        return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.child(child), cache);
     }
 
     @Override
@@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> withPath(ZPath path)
     {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
     }
 
     @Override
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..68baf44 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.details;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.cache.CacheBridge;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
@@ -42,7 +43,7 @@ import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
@@ -69,19 +70,25 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
+        boolean compress = modelSpec.createOptions().contains(CreateOption.compress);
+        boolean createParentsIfNeeded = modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded);
+        boolean createParentsAsContainers = modelSpec.createOptions().contains(CreateOption.createParentsAsContainers);
+        cache = CacheBridge.newTreeCacheBridge(client, basePath.fullPath(), false, compress, createParentsIfNeeded, createParentsAsContainers, executor);
+/*
+            TreeCache.newBuilder(client, basePath.fullPath())
             .setCacheData(false)
             .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
             .setExecutor(executor)
             .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
             .build();
+*/
     }
 
     public void start()
     {
         try
         {
-            cache.getListenable().addListener(this);
+            cache.addListener(this);
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +99,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }