You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/10 17:23:07 UTC

[2/2] curator git commit: CachedModeledFrameworkImpl wasn't using executor in right way. Completions need to happen async not setting of the value. This required creating CachedStage which proxies all non-async methods to their async counterparts using t

CachedModeledFrameworkImpl wasn't using executor in right way. Completions need to happen async not setting of the value. This required creating CachedStage which proxies all non-async methods to their async counterparts using the CachedModeledFrameworkImpl's executor


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6485f165
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6485f165
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6485f165

Branch: refs/heads/CURATOR-397
Commit: 6485f1650e37e392e6122f8b13ff432ee0321666
Parents: 80ca587
Author: randgalt <ra...@apache.org>
Authored: Wed May 10 19:23:00 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed May 10 19:23:00 2017 +0200

----------------------------------------------------------------------
 .../x/async/api/AsyncCuratorFrameworkDsl.java   |   1 -
 .../details/CachedModeledFrameworkImpl.java     |  20 ++--
 .../x/async/modeled/details/CachedStage.java    | 117 +++++++++++++++++++
 .../modeled/TestCachedModeledFramework.java     |  87 ++++++++++++++
 .../x/async/modeled/TestModeledFramework.java   |  10 +-
 5 files changed, 220 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index 0807160..bc66bb6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -19,7 +19,6 @@
 package org.apache.curator.x.async.api;
 
 import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.x.async.WatchMode;
 
 /**
  * Zookeeper framework-style client

http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 15db3ba..9e1fa8f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -203,14 +203,14 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     {
         ZPath path = client.modelSpec().path();
         Optional<ZNode<T>> data = cache.currentData(path);
-        return data.map(node -> completed(new ModelStage<>(null), node.stat())).orElseGet(() -> completed(new ModelStage<>(null), null));
+        return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null));
     }
 
     @Override
     public AsyncStage<List<ZPath>> children()
     {
         Set<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet();
-        return completed(new ModelStage<>(null), Lists.newArrayList(paths));
+        return completed(Lists.newArrayList(paths));
     }
 
     @Override
@@ -261,23 +261,25 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
         return client.inTransaction(operations);
     }
 
-    private <U> ModelStage<U> completed(ModelStage<U> stage, U value)
+    private <U> CachedStage<U> completed(U value)
     {
-        executor.execute(() -> stage.complete(value));
+        CachedStage<U> stage = new CachedStage<>(executor);
+        stage.complete(value);
         return stage;
     }
 
-    private <U> ModelStage<U> completedExceptionally(ModelStage<U> stage, Exception e)
+    private <U> CachedStage<U> completedExceptionally(Exception e)
     {
-        executor.execute(() -> stage.completeExceptionally(e));
+        CachedStage<U> stage = new CachedStage<>(executor);
+        stage.completeExceptionally(e);
         return stage;
     }
 
-    private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver)
+    private <U> CachedStage<U> internalRead(Function<ZNode<T>, U> resolver)
     {
         ZPath path = client.modelSpec().path();
         Optional<ZNode<T>> data = cache.currentData(path);
-        return data.map(node -> completed(new ModelStage<>(null), resolver.apply(node)))
-            .orElseGet(() -> completedExceptionally(new ModelStage<>(null), new KeeperException.NoNodeException(path.fullPath())));
+        return data.map(node -> completed(resolver.apply(node)))
+            .orElseGet(() -> completedExceptionally(new KeeperException.NoNodeException(path.fullPath())));
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
new file mode 100644
index 0000000..edf6b98
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class CachedStage<T> extends CompletableFuture<T> implements AsyncStage<T>
+{
+    private final Executor executor;
+
+    CachedStage(Executor executor)
+    {
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletionStage<WatchedEvent> event()
+    {
+        return null;
+    }
+
+    @Override
+    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
+    {
+        return thenApplyAsync(fn, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
+    {
+        return thenAcceptAsync(action, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> thenRun(Runnable action)
+    {
+        return thenRunAsync(action, executor);
+    }
+
+    @Override
+    public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
+    {
+        return thenCombineAsync(other, fn, executor);
+    }
+
+    @Override
+    public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
+    {
+        return thenAcceptBothAsync(other, action, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
+    {
+        return runAfterBothAsync(other, action, executor);
+    }
+
+    @Override
+    public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
+    {
+        return applyToEitherAsync(other, fn, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
+    {
+        return acceptEitherAsync(other, action, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
+    {
+        return runAfterEitherAsync(other, action, executor);
+    }
+
+    @Override
+    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
+    {
+        return thenComposeAsync(fn, executor);
+    }
+
+    @Override
+    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
+    {
+        return whenCompleteAsync(action, executor);
+    }
+
+    @Override
+    public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
+    {
+        return handleAsync(fn, executor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
new file mode 100644
index 0000000..7be7c28
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+import org.apache.curator.test.Timing;
+import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.models.TestModel;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.math.BigInteger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestCachedModeledFramework extends TestModeledFramework
+{
+    @Test
+    public void testThreading()
+    {
+        TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+        CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        client.listenable().addListener((type, path1, stat, model1) -> latch.countDown());
+
+        complete(client.set(model));
+        client.start();
+        Assert.assertTrue(new Timing().awaitLatch(latch));
+
+        AtomicReference<Thread> completionThread = new AtomicReference<>();
+        complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null)));
+        Assert.assertNotNull(completionThread.get());
+        Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
+        completionThread.set(null);
+
+        complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
+        Assert.assertNotNull(completionThread.get());
+        Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
+        completionThread.set(null);
+    }
+
+    @Test
+    public void testCustomThreading()
+    {
+        AtomicReference<Thread> ourThread = new AtomicReference<>();
+        ExecutorService executor = Executors.newSingleThreadExecutor(r -> {
+            Thread thread = new Thread(r, "testCustomThreading");
+            ourThread.set(thread);
+            return thread;
+        });
+        TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+        CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(executor);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        client.listenable().addListener((type, path1, stat, model1) -> latch.countDown());
+
+        complete(client.set(model));
+        client.start();
+        Assert.assertTrue(new Timing().awaitLatch(latch));
+
+        AtomicReference<Thread> completionThread = new AtomicReference<>();
+        complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null)));
+        Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
+        completionThread.set(null);
+
+        complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
+        Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
+        completionThread.set(null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index b21c8ca..98d5ee1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -45,11 +45,11 @@ import java.util.concurrent.CountDownLatch;
 
 public class TestModeledFramework extends CompletableBaseClassForTests
 {
-    private static final ZPath path = ZPath.parse("/test/path");
-    private CuratorFramework rawClient;
-    private ModelSpec<TestModel> modelSpec;
-    private ModelSpec<TestNewerModel> newModelSpec;
-    private AsyncCuratorFramework async;
+    protected static final ZPath path = ZPath.parse("/test/path");
+    protected CuratorFramework rawClient;
+    protected ModelSpec<TestModel> modelSpec;
+    protected ModelSpec<TestNewerModel> newModelSpec;
+    protected AsyncCuratorFramework async;
 
     @BeforeMethod
     @Override