You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/05 04:15:57 UTC
curator git commit: 1. Allow for an Executor service to be passed to
the cache 2. CachedModeledFramework must complete the stages via a thread as
the caller is expected async processing
Repository: curator
Updated Branches:
refs/heads/CURATOR-397 29b09ceb5 -> f2370b771
1. Allow for an Executor service to be passed to the cache
2. CachedModeledFramework must complete the stages via a thread as the caller is expected async processing
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f2370b77
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f2370b77
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f2370b77
Branch: refs/heads/CURATOR-397
Commit: f2370b7710b85850936f48404c6959d45ed03626
Parents: 29b09ce
Author: randgalt <ra...@apache.org>
Authored: Thu May 4 23:15:14 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu May 4 23:15:14 2017 -0500
----------------------------------------------------------------------
.../x/async/modeled/ModeledFramework.java | 24 +++++++++--
.../details/CachedModeledFrameworkImpl.java | 42 +++++++++++++++-----
.../x/async/modeled/details/ModelStage.java | 12 ------
.../async/modeled/details/ModeledCacheImpl.java | 7 +++-
.../modeled/details/ModeledFrameworkImpl.java | 10 ++++-
.../confluence/modeled-components.confluence | 4 +-
.../site/confluence/modeled-typed.confluence | 4 +-
7 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index 70671ea..f8cf4c7 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -25,6 +25,7 @@ import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.zookeeper.data.Stat;
import java.util.List;
+import java.util.concurrent.ExecutorService;
public interface ModeledFramework<T>
{
@@ -65,16 +66,31 @@ public interface ModeledFramework<T>
}
/**
- * Use an internally created cache as a front for this modeled instance. All read APIs use the internal
- * cache. i.e. read calls always use the cache instead of making direct queries. Note: you must call
- * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#start()} and
- * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#close()} to start/stop
+ * <p>
+ * Use an internally created cache as a front for this modeled instance. All read APIs use the internal
+ * cache. i.e. read calls always use the cache instead of making direct queries. Note: you must call
+ * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#start()} and
+ * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#close()} to start/stop
+ * </p>
+ *
+ * <p>
+ * Note: this method internally allocates an Executor for the cache and read methods. Use
+ * {@link #cached(java.util.concurrent.ExecutorService)} if you'd like to provide your own executor service.
+ * </p>
*
* @return wrapped instance
*/
CachedModeledFramework<T> cached();
/**
+ * Same as {@link #cached()} but allows for providing an executor service
+ *
+ * @param executor thread pool to use for the cache and for read operations
+ * @return wrapped instance
+ */
+ CachedModeledFramework<T> cached(ExecutorService executor);
+
+ /**
* Returns the client that was originally passed to {@link #wrap(org.apache.curator.x.async.AsyncCuratorFramework, ModelSpec)} or
* the builder.
*
http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index ba11bc2..2209d37 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,21 +37,25 @@ import org.apache.zookeeper.server.DataTree;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
private final ModeledFramework<T> client;
private final ModeledCacheImpl<T> cache;
+ private final Executor executor;
- CachedModeledFrameworkImpl(ModeledFramework<T> client)
+ CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
{
- this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec()));
+ this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
}
- private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
+ private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
{
this.client = client;
this.cache = cache;
+ this.executor = executor;
}
@Override
@@ -81,7 +85,13 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
@Override
public CachedModeledFramework<T> cached()
{
- return this;
+ throw new UnsupportedOperationException("Already a cached instance");
+ }
+
+ @Override
+ public CachedModeledFramework<T> cached(ExecutorService executor)
+ {
+ throw new UnsupportedOperationException("Already a cached instance");
}
@Override
@@ -99,13 +109,13 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
@Override
public CachedModeledFramework<T> at(Object child)
{
- return new CachedModeledFrameworkImpl<>(client.at(child), cache);
+ return new CachedModeledFrameworkImpl<>(client.at(child), cache, executor);
}
@Override
public CachedModeledFramework<T> withPath(ZPath path)
{
- return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
+ return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
}
@Override
@@ -136,8 +146,8 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
DataTree.copyStat(node.stat(), storingStatIn);
}
- return new ModelStage<>(node.model());
- }).orElseGet(() -> new ModelStage<>(new KeeperException.NoNodeException(path.fullPath())));
+ return completed(new ModelStage<>(), node.model());
+ }).orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath())));
}
@Override
@@ -169,14 +179,14 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
ZPath path = client.modelSpec().path();
Optional<ZNode<T>> data = cache.currentData(path);
- return data.map(node -> new ModelStage<>(node.stat())).orElseGet(() -> new ModelStage<>((Stat)null));
+ return data.map(node -> completed(new ModelStage<>(), node.stat())).orElseGet(() -> completed(new ModelStage<>(), null));
}
@Override
public AsyncStage<List<ZPath>> children()
{
Set<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet();
- return new ModelStage<>(Lists.newArrayList(paths));
+ return completed(new ModelStage<>(), Lists.newArrayList(paths));
}
@Override
@@ -226,4 +236,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
return client.inTransaction(operations);
}
+
+ private <U> ModelStage<U> completed(ModelStage<U> stage, U value)
+ {
+ executor.execute(() -> stage.complete(value));
+ return stage;
+ }
+
+ private <U> ModelStage<U> completedExceptionally(ModelStage<U> stage, Exception e)
+ {
+ executor.execute(() -> stage.completeExceptionally(e));
+ return stage;
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
index 77caed1..9be9a33 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
@@ -37,18 +37,6 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T>
this.event = event;
}
- ModelStage(T value)
- {
- event = null;
- complete(value);
- }
-
- ModelStage(Exception e)
- {
- event = null;
- completeExceptionally(e);
- }
-
@Override
public CompletionStage<WatchedEvent> event()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 061ea17..091a727 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.x.async.modeled.details;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
@@ -36,6 +37,9 @@ import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
@@ -57,12 +61,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
}
}
- ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec)
+ ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
{
this.serializer = modelSpec.serializer();
cache = TreeCache.newBuilder(client, modelSpec.path().fullPath())
.setCacheData(false)
.setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
+ .setExecutor(executor)
.setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
.build();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 754fb3b..b666822 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.WatchMode;
@@ -42,6 +43,7 @@ import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -98,8 +100,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
@Override
public CachedModeledFramework<T> cached()
{
+ return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+ }
+
+ @Override
+ public CachedModeledFramework<T> cached(ExecutorService executor)
+ {
Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers.");
- return new CachedModeledFrameworkImpl<>(this);
+ return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null"));
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/site/confluence/modeled-components.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/modeled-components.confluence b/curator-x-async/src/site/confluence/modeled-components.confluence
index d44932a..ab49750 100644
--- a/curator-x-async/src/site/confluence/modeled-components.confluence
+++ b/curator-x-async/src/site/confluence/modeled-components.confluence
@@ -45,7 +45,7 @@ A {{ModelSpec}} contains all the metadata needed to operate on a ZooKeeper path:
* Options for how to delete nodes (guaranteed, deleting children, etc.)
ModelSpec instances are created via a builder. The builder sets defaults that should be
-desired for most applications but you can alter any of these as needed.
+useful for most applications but you can alter any of these as needed.
{code}
// a standard model spec for the given path and serializer
@@ -87,7 +87,7 @@ The "set" call in the above example is the equivalent of:
{code}
MyModel instance = ...
String path = "/foo/bar/" + instance.getId();
-byte[] data = serializer.serialize(data);
+byte[] data = serializer.serialize(instance);
client.create()
.withOptions(Sets.newHashSet(CreateOption.createParentsAsContainers, CreateOption.setDataIfExists))
.forPath(path, data);
http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/site/confluence/modeled-typed.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/modeled-typed.confluence b/curator-x-async/src/site/confluence/modeled-typed.confluence
index 5b8affd..5a8a597 100644
--- a/curator-x-async/src/site/confluence/modeled-typed.confluence
+++ b/curator-x-async/src/site/confluence/modeled-typed.confluence
@@ -1,6 +1,6 @@
h1. Modeled Curator \- Caching and Typed Parameters
-In addition to the [[main features|modeled-components.html]] Modeled Curator also supports
+In addition to its [[main features|modeled-components.html]] Modeled Curator also supports
integrated caching and typed parameters.
h2. Caching
@@ -35,7 +35,7 @@ Typed interfaces are provided for up to 10 parameters and are named
{{TypedModeledFramework2}}, etc.
Here's an example of a TypedModeledFramework that models a Person and uses two parameters
-to generate the path, a Group and a Organization:
+to generate the path, a Group and an Organization:
{code}
TypedModeledFramework2<Person, Group, Organization> clientTemplate = TypedModeledFramework2.from(