You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 01:40:13 UTC

[curator] branch persistent-watcher-bridge created (now 492eac6)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git.


      at 492eac6  wip

This branch includes the following new commits:

     new 492eac6  wip

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[curator] 01/01: wip

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 492eac6b397f9fb2d4ffd7148ef2b45234b548c4
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Oct 24 22:05:25 2019 +0300

    wip
---
 .../framework/recipes/cache/CacheBridge.java       | 39 ++++++++++
 .../recipes/cache/CuratorCacheBridge.java          | 63 ++++++++++++++++
 .../framework/recipes/cache/NodeCacheBridge.java   | 82 ++++++++++++++++++++
 .../recipes/cache/PathChildrenCacheBridge.java     | 88 ++++++++++++++++++++++
 .../framework/recipes/cache/TreeCacheBridge.java   | 78 +++++++++++++++++++
 .../details/CachedModeledFrameworkImpl.java        | 11 +--
 .../x/async/modeled/details/ModeledCacheImpl.java  | 14 +++-
 7 files changed, 364 insertions(+), 11 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java
new file mode 100644
index 0000000..b56f873
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java
@@ -0,0 +1,39 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public interface CacheBridge extends Closeable
+{
+    void addListener(TreeCacheListener listener);
+
+    void addListener(NodeCacheListener listener);
+
+    void addListener(PathChildrenCacheListener listener);
+
+    void start();
+
+    Optional<ChildData> currentData();
+
+    Optional<ChildData> currentData(String fullPath);
+
+    @Override
+    void close();
+
+    static CacheBridge newPathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData)
+    {
+        return PathChildrenCacheBridge.build(client, path, cacheData);
+    }
+
+    static CacheBridge newTreeCacheBridge(CuratorFramework client, String path, boolean cacheData, boolean compressedData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor)
+    {
+        return TreeCacheBridge.build(client, path, cacheData, compressedData, createParentsIfNeeded, createParentsAsContainers, executor);
+    }
+
+    static CacheBridge newNodeCacheBridge(CuratorFramework client, String path)
+    {
+        return NodeCacheBridge.build(client, path);
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..ddf663d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,63 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Optional;
+
+class CuratorCacheBridge implements CacheBridge
+{
+    private final CuratorCache cache;
+    private final CuratorFramework client;
+
+    CuratorCacheBridge(CuratorFramework client, CuratorCacheStorage storage, String path, CuratorCache.Options... options)
+    {
+        this.client = client;
+        cache = CuratorCache.build(client, storage, path, options);
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client, listener).build());
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        cache.listenable().addListener(CuratorCacheListener.builder().forNodeCache(listener).build());
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        cache.listenable().addListener(CuratorCacheListener.builder().forPathChildrenCache(client, listener).build());
+    }
+
+    @Override
+    public void start()
+    {
+        cache.start();
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        return cache.getRootData();
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        return cache.storage().get(fullPath);
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    CuratorCache cache()
+    {
+        return cache;
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java
new file mode 100644
index 0000000..b4debd3
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java
@@ -0,0 +1,82 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import java.io.IOException;
+import java.util.Optional;
+
+@SuppressWarnings("deprecation")
+class NodeCacheBridge implements CacheBridge
+{
+    private final NodeCache cache;
+
+    static CacheBridge build(CuratorFramework client, String path)
+    {
+        if ( Compatibility.hasPersistentWatches() )
+        {
+            return new CuratorCacheBridge(client, CuratorCacheStorage.standard(), path);
+        }
+        return new NodeCacheBridge(client, path);
+    }
+
+    private NodeCacheBridge(CuratorFramework client, String path)
+    {
+        cache = new NodeCache(client, path);
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        cache.getListenable().addListener(listener);
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        return Optional.ofNullable(cache.getCurrentData());
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            cache.close();
+        }
+        catch ( IOException e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java
new file mode 100644
index 0000000..cf1b4e0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java
@@ -0,0 +1,88 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import org.apache.curator.utils.ZKPaths;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@SuppressWarnings("deprecation")
+class PathChildrenCacheBridge implements CacheBridge
+{
+    private final PathChildrenCache cache;
+
+    static CacheBridge build(CuratorFramework client, String path, boolean cacheData)
+    {
+        if ( Compatibility.hasPersistentWatches() )
+        {
+            CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+            CuratorCacheBridge curatorCache = new CuratorCacheBridge(client, storage, path, RECURSIVE);
+            curatorCache.cache().setPathFilter(filtering -> ZKPaths.getPathAndNode(filtering).getPath().equals(path));  // mimic PathChildrenCache which only caches the children of the main path
+            return curatorCache;
+        }
+        return new PathChildrenCacheBridge(client, path, cacheData);
+    }
+
+    private PathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData)
+    {
+        cache = new PathChildrenCache(client, path, cacheData);
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        cache.getListenable().addListener(listener);
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        return Optional.ofNullable(cache.getCurrentData(fullPath));
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            cache.close();
+        }
+        catch ( IOException e )
+        {
+            throw new RuntimeException();
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
new file mode 100644
index 0000000..1b173b8
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -0,0 +1,78 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@SuppressWarnings("deprecation")
+class TreeCacheBridge implements CacheBridge
+{
+    private final TreeCache cache;
+
+    static CacheBridge build(CuratorFramework client, String path, boolean cacheData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor)
+    {
+        if ( Compatibility.hasPersistentWatches() )
+        {
+            CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+            return new CuratorCacheBridge(client, storage, path, RECURSIVE);
+        }
+        return new TreeCacheBridge(client, path, cacheData);
+    }
+
+    private TreeCacheBridge(CuratorFramework client, String path, boolean cacheData)
+    {
+        cache = TreeCache.newBuilder(client, path).setCacheData(cacheData).build();
+    }
+
+    @Override
+    public void addListener(TreeCacheListener listener)
+    {
+        cache.getListenable().addListener(listener);
+    }
+
+    @Override
+    public void addListener(NodeCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addListener(PathChildrenCacheListener listener)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Optional<ChildData> currentData()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Optional<ChildData> currentData(String fullPath)
+    {
+        return Optional.ofNullable(cache.getCurrentData(fullPath));
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index c897b4e..2dd5625 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
 {
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
-    private final Executor executor;
 
     CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
     {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor));
     }
 
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
     {
         this.client = client;
         this.cache = cache;
-        this.executor = executor;
     }
 
     @Override
@@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> child(Object child)
     {
-        return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.child(child), cache);
     }
 
     @Override
@@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> withPath(ZPath path)
     {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
     }
 
     @Override
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..68baf44 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.details;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.cache.CacheBridge;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
@@ -42,7 +43,7 @@ import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
@@ -69,19 +70,25 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
+        boolean compress = modelSpec.createOptions().contains(CreateOption.compress);
+        boolean createParentsIfNeeded = modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded);
+        boolean createParentsAsContainers = modelSpec.createOptions().contains(CreateOption.createParentsAsContainers);
+        cache = CacheBridge.newTreeCacheBridge(client, basePath.fullPath(), false, compress, createParentsIfNeeded, createParentsAsContainers, executor);
+/*
+            TreeCache.newBuilder(client, basePath.fullPath())
             .setCacheData(false)
             .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
             .setExecutor(executor)
             .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
             .build();
+*/
     }
 
     public void start()
     {
         try
         {
-            cache.getListenable().addListener(this);
+            cache.addListener(this);
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +99,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }