You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 01:40:14 UTC
[curator] 01/01: wip
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 492eac6b397f9fb2d4ffd7148ef2b45234b548c4
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Oct 24 22:05:25 2019 +0300
wip
---
.../framework/recipes/cache/CacheBridge.java | 39 ++++++++++
.../recipes/cache/CuratorCacheBridge.java | 63 ++++++++++++++++
.../framework/recipes/cache/NodeCacheBridge.java | 82 ++++++++++++++++++++
.../recipes/cache/PathChildrenCacheBridge.java | 88 ++++++++++++++++++++++
.../framework/recipes/cache/TreeCacheBridge.java | 78 +++++++++++++++++++
.../details/CachedModeledFrameworkImpl.java | 11 +--
.../x/async/modeled/details/ModeledCacheImpl.java | 14 +++-
7 files changed, 364 insertions(+), 11 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java
new file mode 100644
index 0000000..b56f873
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CacheBridge.java
@@ -0,0 +1,39 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public interface CacheBridge extends Closeable
+{
+ void addListener(TreeCacheListener listener);
+
+ void addListener(NodeCacheListener listener);
+
+ void addListener(PathChildrenCacheListener listener);
+
+ void start();
+
+ Optional<ChildData> currentData();
+
+ Optional<ChildData> currentData(String fullPath);
+
+ @Override
+ void close();
+
+ static CacheBridge newPathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData)
+ {
+ return PathChildrenCacheBridge.build(client, path, cacheData);
+ }
+
+ static CacheBridge newTreeCacheBridge(CuratorFramework client, String path, boolean cacheData, boolean compressedData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor)
+ {
+ return TreeCacheBridge.build(client, path, cacheData, compressedData, createParentsIfNeeded, createParentsAsContainers, executor);
+ }
+
+ static CacheBridge newNodeCacheBridge(CuratorFramework client, String path)
+ {
+ return NodeCacheBridge.build(client, path);
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..ddf663d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,63 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Optional;
+
+class CuratorCacheBridge implements CacheBridge
+{
+ private final CuratorCache cache;
+ private final CuratorFramework client;
+
+ CuratorCacheBridge(CuratorFramework client, CuratorCacheStorage storage, String path, CuratorCache.Options... options)
+ {
+ this.client = client;
+ cache = CuratorCache.build(client, storage, path, options);
+ }
+
+ @Override
+ public void addListener(TreeCacheListener listener)
+ {
+ cache.listenable().addListener(CuratorCacheListener.builder().forTreeCache(client, listener).build());
+ }
+
+ @Override
+ public void addListener(NodeCacheListener listener)
+ {
+ cache.listenable().addListener(CuratorCacheListener.builder().forNodeCache(listener).build());
+ }
+
+ @Override
+ public void addListener(PathChildrenCacheListener listener)
+ {
+ cache.listenable().addListener(CuratorCacheListener.builder().forPathChildrenCache(client, listener).build());
+ }
+
+ @Override
+ public void start()
+ {
+ cache.start();
+ }
+
+ @Override
+ public Optional<ChildData> currentData()
+ {
+ return cache.getRootData();
+ }
+
+ @Override
+ public Optional<ChildData> currentData(String fullPath)
+ {
+ return cache.storage().get(fullPath);
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ }
+
+ CuratorCache cache()
+ {
+ return cache;
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java
new file mode 100644
index 0000000..b4debd3
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheBridge.java
@@ -0,0 +1,82 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import java.io.IOException;
+import java.util.Optional;
+
+@SuppressWarnings("deprecation")
+class NodeCacheBridge implements CacheBridge
+{
+ private final NodeCache cache;
+
+ static CacheBridge build(CuratorFramework client, String path)
+ {
+ if ( Compatibility.hasPersistentWatches() )
+ {
+ return new CuratorCacheBridge(client, CuratorCacheStorage.standard(), path);
+ }
+ return new NodeCacheBridge(client, path);
+ }
+
+ private NodeCacheBridge(CuratorFramework client, String path)
+ {
+ cache = new NodeCache(client, path);
+ }
+
+ @Override
+ public void addListener(TreeCacheListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addListener(NodeCacheListener listener)
+ {
+ cache.getListenable().addListener(listener);
+ }
+
+ @Override
+ public void addListener(PathChildrenCacheListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void start()
+ {
+ try
+ {
+ cache.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Optional<ChildData> currentData()
+ {
+ return Optional.ofNullable(cache.getCurrentData());
+ }
+
+ @Override
+ public Optional<ChildData> currentData(String fullPath)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ cache.close();
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java
new file mode 100644
index 0000000..cf1b4e0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheBridge.java
@@ -0,0 +1,88 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import org.apache.curator.utils.ZKPaths;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@SuppressWarnings("deprecation")
+class PathChildrenCacheBridge implements CacheBridge
+{
+ private final PathChildrenCache cache;
+
+ static CacheBridge build(CuratorFramework client, String path, boolean cacheData)
+ {
+ if ( Compatibility.hasPersistentWatches() )
+ {
+ CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+ CuratorCacheBridge curatorCache = new CuratorCacheBridge(client, storage, path, RECURSIVE);
+ curatorCache.cache().setPathFilter(filtering -> ZKPaths.getPathAndNode(filtering).getPath().equals(path)); // mimic PathChildrenCache which only caches the children of the main path
+ return curatorCache;
+ }
+ return new PathChildrenCacheBridge(client, path, cacheData);
+ }
+
+ private PathChildrenCacheBridge(CuratorFramework client, String path, boolean cacheData)
+ {
+ cache = new PathChildrenCache(client, path, cacheData);
+ }
+
+ @Override
+ public void addListener(TreeCacheListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addListener(NodeCacheListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addListener(PathChildrenCacheListener listener)
+ {
+ cache.getListenable().addListener(listener);
+ }
+
+ @Override
+ public void start()
+ {
+ try
+ {
+ cache.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Optional<ChildData> currentData()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Optional<ChildData> currentData(String fullPath)
+ {
+ return Optional.ofNullable(cache.getCurrentData(fullPath));
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ cache.close();
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException();
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
new file mode 100644
index 0000000..1b173b8
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -0,0 +1,78 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@SuppressWarnings("deprecation")
+class TreeCacheBridge implements CacheBridge
+{
+ private final TreeCache cache;
+
+ static CacheBridge build(CuratorFramework client, String path, boolean cacheData, boolean createParentsIfNeeded, boolean createParentsAsContainers, ExecutorService executor)
+ {
+ if ( Compatibility.hasPersistentWatches() )
+ {
+ CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.bytesNotCached();
+ return new CuratorCacheBridge(client, storage, path, RECURSIVE);
+ }
+ return new TreeCacheBridge(client, path, cacheData);
+ }
+
+ private TreeCacheBridge(CuratorFramework client, String path, boolean cacheData)
+ {
+ cache = TreeCache.newBuilder(client, path).setCacheData(cacheData).build();
+ }
+
+ @Override
+ public void addListener(TreeCacheListener listener)
+ {
+ cache.getListenable().addListener(listener);
+ }
+
+ @Override
+ public void addListener(NodeCacheListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addListener(PathChildrenCacheListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void start()
+ {
+ try
+ {
+ cache.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Optional<ChildData> currentData()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Optional<ChildData> currentData(String fullPath)
+ {
+ return Optional.ofNullable(cache.getCurrentData(fullPath));
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index c897b4e..2dd5625 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
private final ModeledFramework<T> client;
private final ModeledCacheImpl<T> cache;
- private final Executor executor;
CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
{
- this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
+ this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor));
}
- private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
+ private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
{
this.client = client;
this.cache = cache;
- this.executor = executor;
}
@Override
@@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
@Override
public CachedModeledFramework<T> child(Object child)
{
- return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor);
+ return new CachedModeledFrameworkImpl<>(client.child(child), cache);
}
@Override
@@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
@Override
public CachedModeledFramework<T> withPath(ZPath path)
{
- return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
+ return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
}
@Override
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..68baf44 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.details;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.cache.CacheBridge;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
@@ -42,7 +43,7 @@ import java.util.stream.Collectors;
class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
{
- private final TreeCache cache;
+ private final CacheBridge cache;
private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
private final ModelSerializer<T> serializer;
private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
@@ -69,19 +70,25 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
basePath = modelSpec.path();
this.serializer = modelSpec.serializer();
- cache = TreeCache.newBuilder(client, basePath.fullPath())
+ boolean compress = modelSpec.createOptions().contains(CreateOption.compress);
+ boolean createParentsIfNeeded = modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded);
+ boolean createParentsAsContainers = modelSpec.createOptions().contains(CreateOption.createParentsAsContainers);
+ cache = CacheBridge.newTreeCacheBridge(client, basePath.fullPath(), false, compress, createParentsIfNeeded, createParentsAsContainers, executor);
+/*
+ TreeCache.newBuilder(client, basePath.fullPath())
.setCacheData(false)
.setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
.setExecutor(executor)
.setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
.build();
+*/
}
public void start()
{
try
{
- cache.getListenable().addListener(this);
+ cache.addListener(this);
cache.start();
}
catch ( Exception e )
@@ -92,7 +99,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
public void close()
{
- cache.getListenable().removeListener(this);
cache.close();
entries.clear();
}