You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/05 04:15:57 UTC

curator git commit: 1. Allow for an Executor service to be passed to the cache 2. CachedModeledFramework must complete the stages via a thread as the caller is expected async processing

Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 29b09ceb5 -> f2370b771


1. Allow for an Executor service to be passed to the cache
2. CachedModeledFramework must complete the stages via a thread as the caller is expected async processing


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f2370b77
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f2370b77
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f2370b77

Branch: refs/heads/CURATOR-397
Commit: f2370b7710b85850936f48404c6959d45ed03626
Parents: 29b09ce
Author: randgalt <ra...@apache.org>
Authored: Thu May 4 23:15:14 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu May 4 23:15:14 2017 -0500

----------------------------------------------------------------------
 .../x/async/modeled/ModeledFramework.java       | 24 +++++++++--
 .../details/CachedModeledFrameworkImpl.java     | 42 +++++++++++++++-----
 .../x/async/modeled/details/ModelStage.java     | 12 ------
 .../async/modeled/details/ModeledCacheImpl.java |  7 +++-
 .../modeled/details/ModeledFrameworkImpl.java   | 10 ++++-
 .../confluence/modeled-components.confluence    |  4 +-
 .../site/confluence/modeled-typed.confluence    |  4 +-
 7 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index 70671ea..f8cf4c7 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -25,6 +25,7 @@ import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 public interface ModeledFramework<T>
 {
@@ -65,16 +66,31 @@ public interface ModeledFramework<T>
     }
 
     /**
-     * Use an internally created cache as a front for this modeled instance. All read APIs use the internal
-     * cache. i.e. read calls always use the cache instead of making direct queries. Note: you must call
-     * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#start()} and
-     * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#close()} to start/stop
+     * <p>
+     *     Use an internally created cache as a front for this modeled instance. All read APIs use the internal
+     *     cache. i.e. read calls always use the cache instead of making direct queries. Note: you must call
+     *     {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#start()} and
+     *     {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#close()} to start/stop
+     * </p>
+     *
+     * <p>
+     *     Note: this method internally allocates an Executor for the cache and read methods. Use
+     *     {@link #cached(java.util.concurrent.ExecutorService)} if you'd like to provide your own executor service.
+     * </p>
      *
      * @return wrapped instance
      */
     CachedModeledFramework<T> cached();
 
     /**
+     * Same as {@link #cached()} but allows for providing an executor service
+     *
+     * @param executor thread pool to use for the cache and for read operations
+     * @return wrapped instance
+     */
+    CachedModeledFramework<T> cached(ExecutorService executor);
+
+    /**
      * Returns the client that was originally passed to {@link #wrap(org.apache.curator.x.async.AsyncCuratorFramework, ModelSpec)} or
      * the builder.
      *

http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index ba11bc2..2209d37 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,21 +37,25 @@ import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
 {
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
+    private final Executor executor;
 
-    CachedModeledFrameworkImpl(ModeledFramework<T> client)
+    CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
     {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec()));
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
     }
 
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
     {
         this.client = client;
         this.cache = cache;
+        this.executor = executor;
     }
 
     @Override
@@ -81,7 +85,13 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> cached()
     {
-        return this;
+        throw new UnsupportedOperationException("Already a cached instance");
+    }
+
+    @Override
+    public CachedModeledFramework<T> cached(ExecutorService executor)
+    {
+        throw new UnsupportedOperationException("Already a cached instance");
     }
 
     @Override
@@ -99,13 +109,13 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> at(Object child)
     {
-        return new CachedModeledFrameworkImpl<>(client.at(child), cache);
+        return new CachedModeledFrameworkImpl<>(client.at(child), cache, executor);
     }
 
     @Override
     public CachedModeledFramework<T> withPath(ZPath path)
     {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
     }
 
     @Override
@@ -136,8 +146,8 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
             {
                 DataTree.copyStat(node.stat(), storingStatIn);
             }
-            return new ModelStage<>(node.model());
-        }).orElseGet(() -> new ModelStage<>(new KeeperException.NoNodeException(path.fullPath())));
+            return completed(new ModelStage<>(), node.model());
+        }).orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath())));
     }
 
     @Override
@@ -169,14 +179,14 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     {
         ZPath path = client.modelSpec().path();
         Optional<ZNode<T>> data = cache.currentData(path);
-        return data.map(node -> new ModelStage<>(node.stat())).orElseGet(() -> new ModelStage<>((Stat)null));
+        return data.map(node -> completed(new ModelStage<>(), node.stat())).orElseGet(() -> completed(new ModelStage<>(), null));
     }
 
     @Override
     public AsyncStage<List<ZPath>> children()
     {
         Set<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet();
-        return new ModelStage<>(Lists.newArrayList(paths));
+        return completed(new ModelStage<>(), Lists.newArrayList(paths));
     }
 
     @Override
@@ -226,4 +236,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     {
         return client.inTransaction(operations);
     }
+
+    private <U> ModelStage<U> completed(ModelStage<U> stage, U value)
+    {
+        executor.execute(() -> stage.complete(value));
+        return stage;
+    }
+
+    private <U> ModelStage<U> completedExceptionally(ModelStage<U> stage, Exception e)
+    {
+        executor.execute(() -> stage.completeExceptionally(e));
+        return stage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
index 77caed1..9be9a33 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
@@ -37,18 +37,6 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T>
         this.event = event;
     }
 
-    ModelStage(T value)
-    {
-        event = null;
-        complete(value);
-    }
-
-    ModelStage(Exception e)
-    {
-        event = null;
-        completeExceptionally(e);
-    }
-
     @Override
     public CompletionStage<WatchedEvent> event()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 061ea17..091a727 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.modeled.details;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
@@ -36,6 +37,9 @@ import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
@@ -57,12 +61,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         }
     }
 
-    ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec)
+    ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
     {
         this.serializer = modelSpec.serializer();
         cache = TreeCache.newBuilder(client, modelSpec.path().fullPath())
             .setCacheData(false)
             .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
+            .setExecutor(executor)
             .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
             .build();
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 754fb3b..b666822 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
@@ -42,6 +43,7 @@ import org.apache.zookeeper.data.Stat;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -98,8 +100,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public CachedModeledFramework<T> cached()
     {
+        return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+    }
+
+    @Override
+    public CachedModeledFramework<T> cached(ExecutorService executor)
+    {
         Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers.");
-        return new CachedModeledFrameworkImpl<>(this);
+        return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null"));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/site/confluence/modeled-components.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/modeled-components.confluence b/curator-x-async/src/site/confluence/modeled-components.confluence
index d44932a..ab49750 100644
--- a/curator-x-async/src/site/confluence/modeled-components.confluence
+++ b/curator-x-async/src/site/confluence/modeled-components.confluence
@@ -45,7 +45,7 @@ A {{ModelSpec}} contains all the metadata needed to operate on a ZooKeeper path:
 * Options for how to delete nodes (guaranteed, deleting children, etc.)
 
 ModelSpec instances are created via a builder. The builder sets defaults that should be
-desired for most applications but you can alter any of these as needed.
+useful for most applications but you can alter any of these as needed.
 
 {code}
 // a standard model spec for the given path and serializer
@@ -87,7 +87,7 @@ The "set" call in the above example is the equivalent of:
 {code}
 MyModel instance = ...
 String path = "/foo/bar/" + instance.getId();
-byte[] data = serializer.serialize(data);
+byte[] data = serializer.serialize(instance);
 client.create()
     .withOptions(Sets.newHashSet(CreateOption.createParentsAsContainers, CreateOption.setDataIfExists))
     .forPath(path, data);

http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/site/confluence/modeled-typed.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/modeled-typed.confluence b/curator-x-async/src/site/confluence/modeled-typed.confluence
index 5b8affd..5a8a597 100644
--- a/curator-x-async/src/site/confluence/modeled-typed.confluence
+++ b/curator-x-async/src/site/confluence/modeled-typed.confluence
@@ -1,6 +1,6 @@
 h1. Modeled Curator \- Caching and Typed Parameters
 
-In addition to the [[main features|modeled-components.html]] Modeled Curator also supports
+In addition to its [[main features|modeled-components.html]] Modeled Curator also supports
 integrated caching and typed parameters.
 
 h2. Caching
@@ -35,7 +35,7 @@ Typed interfaces are provided for up to 10 parameters and are named
 {{TypedModeledFramework2}}, etc.
 
 Here's an example of a TypedModeledFramework that models a Person and uses two parameters
-to generate the path, a Group and a Organization:
+to generate the path, a Group and an Organization:
 
 {code}
 TypedModeledFramework2<Person, Group, Organization> clientTemplate = TypedModeledFramework2.from(