You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 02:13:31 UTC
[curator] 02/02: Created new replacement cache recipe,
CuratorCache. Replaces TreeCache,
NodeCache and PathChildrenCache when Persistent Watchers are available.
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch persistent-watcher-cache-zk35
in repository https://gitbox.apache.org/repos/asf/curator.git
commit fab42ae7412640f75cf2b3cb69894faf46e69648
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 7 15:48:57 2019 +0300
Created new replacement cache recipe, CuratorCache. Replaces TreeCache, NodeCache and PathChildrenCache when Persistent Watchers are available.
---
.../recipes/cache/CuratorCacheBuilderImpl.java | 7 +-
.../recipes/cache/CuratorCacheStorage.java | 1 -
.../recipes/cache/CuratorTreeCacheBridge.java | 129 +++++++++++++++++++++
3 files changed, 135 insertions(+), 2 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index d47d779..1b5cd9c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
@@ -68,6 +69,10 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
@Override
public CuratorCache build()
{
- return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+ if ( Compatibility.hasPersistentWatches() )
+ {
+ return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+ }
+ return new CuratorTreeCacheBridge(client, path, options, executor, exceptionHandler);
}
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index 34b187f..427c139 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -21,7 +21,6 @@ package org.apache.curator.framework.recipes.cache;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
-import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java
new file mode 100644
index 0000000..dc15767
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorTreeCacheBridge.java
@@ -0,0 +1,129 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+class CuratorTreeCacheBridge implements CuratorCache, CuratorCacheStorage
+{
+ private final TreeCache treeCache;
+ private final String path;
+
+ CuratorTreeCacheBridge(CuratorFramework client, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler)
+ {
+ Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+ this.path = path;
+ TreeCache.Builder builder = TreeCache.newBuilder(client, path);
+ if ( options.contains(Options.SINGLE_NODE_CACHE) )
+ {
+ builder.setMaxDepth(0);
+ }
+ if ( options.contains(Options.COMPRESSED_DATA) )
+ {
+ builder.setDataIsCompressed(true);
+ }
+ if ( executor != null )
+ {
+ //builder = builder.setExecutor()
+ }
+ treeCache = builder.build();
+ if ( exceptionHandler != null )
+ {
+ treeCache.getUnhandledErrorListenable().addListener((message, e) -> {
+ if ( e instanceof Exception )
+ {
+ exceptionHandler.accept((Exception)e);
+ }
+ else
+ {
+ exceptionHandler.accept(new RuntimeException(e));
+ }
+ });
+ }
+ treeCache.getListenable().addListener((__, event) -> callListeners(event));
+ }
+
+ @Override
+ public void start()
+ {
+ try
+ {
+ treeCache.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ treeCache.close();
+ }
+
+ @Override
+ public CuratorCacheStorage storage()
+ {
+ return this;
+ }
+
+ @Override
+ public Listenable<CuratorCacheListener> listenable()
+ {
+ return null;
+ }
+
+ @Override
+ public Optional<ChildData> put(ChildData data)
+ {
+ throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support put()");
+ }
+
+ @Override
+ public Optional<ChildData> remove(String path)
+ {
+ throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support remove()");
+ }
+
+ @Override
+ public Optional<ChildData> get(String path)
+ {
+ return Optional.ofNullable(treeCache.getCurrentData(path));
+ }
+
+ @Override
+ public int size()
+ {
+ throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support size()");
+ }
+
+ @Override
+ public Stream<ChildData> stream()
+ {
+ throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support stream()");
+ }
+
+ @Override
+ public Stream<ChildData> streamImmediateChildren(String fromParent)
+ {
+ return treeCache.getCurrentChildren(path).values().stream();
+ }
+
+ @Override
+ public void clear()
+ {
+ throw new UnsupportedOperationException("CuratorTreeCacheBridge does not support clear()");
+ }
+
+ private void callListeners(TreeCacheEvent event)
+ {
+ // TODO
+ }
+}