You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/10 17:23:07 UTC
[2/2] curator git commit: CachedModeledFrameworkImpl wasn't using
executor in right way. Completions need to happen async not setting of the
value. This required creating CachedStage which proxies all non-async methods
to their async counterparts using t
CachedModeledFrameworkImpl wasn't using executor in right way. Completions need to happen async not setting of the value. This required creating CachedStage which proxies all non-async methods to their async counterparts using the CachedModeledFrameworkImpl's executor
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6485f165
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6485f165
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6485f165
Branch: refs/heads/CURATOR-397
Commit: 6485f1650e37e392e6122f8b13ff432ee0321666
Parents: 80ca587
Author: randgalt <ra...@apache.org>
Authored: Wed May 10 19:23:00 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed May 10 19:23:00 2017 +0200
----------------------------------------------------------------------
.../x/async/api/AsyncCuratorFrameworkDsl.java | 1 -
.../details/CachedModeledFrameworkImpl.java | 20 ++--
.../x/async/modeled/details/CachedStage.java | 117 +++++++++++++++++++
.../modeled/TestCachedModeledFramework.java | 87 ++++++++++++++
.../x/async/modeled/TestModeledFramework.java | 10 +-
5 files changed, 220 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index 0807160..bc66bb6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -19,7 +19,6 @@
package org.apache.curator.x.async.api;
import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.x.async.WatchMode;
/**
* Zookeeper framework-style client
http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 15db3ba..9e1fa8f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -203,14 +203,14 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
ZPath path = client.modelSpec().path();
Optional<ZNode<T>> data = cache.currentData(path);
- return data.map(node -> completed(new ModelStage<>(null), node.stat())).orElseGet(() -> completed(new ModelStage<>(null), null));
+ return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null));
}
@Override
public AsyncStage<List<ZPath>> children()
{
Set<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet();
- return completed(new ModelStage<>(null), Lists.newArrayList(paths));
+ return completed(Lists.newArrayList(paths));
}
@Override
@@ -261,23 +261,25 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
return client.inTransaction(operations);
}
- private <U> ModelStage<U> completed(ModelStage<U> stage, U value)
+ private <U> CachedStage<U> completed(U value)
{
- executor.execute(() -> stage.complete(value));
+ CachedStage<U> stage = new CachedStage<>(executor);
+ stage.complete(value);
return stage;
}
- private <U> ModelStage<U> completedExceptionally(ModelStage<U> stage, Exception e)
+ private <U> CachedStage<U> completedExceptionally(Exception e)
{
- executor.execute(() -> stage.completeExceptionally(e));
+ CachedStage<U> stage = new CachedStage<>(executor);
+ stage.completeExceptionally(e);
return stage;
}
- private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver)
+ private <U> CachedStage<U> internalRead(Function<ZNode<T>, U> resolver)
{
ZPath path = client.modelSpec().path();
Optional<ZNode<T>> data = cache.currentData(path);
- return data.map(node -> completed(new ModelStage<>(null), resolver.apply(node)))
- .orElseGet(() -> completedExceptionally(new ModelStage<>(null), new KeeperException.NoNodeException(path.fullPath())));
+ return data.map(node -> completed(resolver.apply(node)))
+ .orElseGet(() -> completedExceptionally(new KeeperException.NoNodeException(path.fullPath())));
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
new file mode 100644
index 0000000..edf6b98
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class CachedStage<T> extends CompletableFuture<T> implements AsyncStage<T>
+{
+ private final Executor executor;
+
+ CachedStage(Executor executor)
+ {
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletionStage<WatchedEvent> event()
+ {
+ return null;
+ }
+
+ @Override
+ public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
+ {
+ return thenApplyAsync(fn, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
+ {
+ return thenAcceptAsync(action, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> thenRun(Runnable action)
+ {
+ return thenRunAsync(action, executor);
+ }
+
+ @Override
+ public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
+ {
+ return thenCombineAsync(other, fn, executor);
+ }
+
+ @Override
+ public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
+ {
+ return thenAcceptBothAsync(other, action, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
+ {
+ return runAfterBothAsync(other, action, executor);
+ }
+
+ @Override
+ public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
+ {
+ return applyToEitherAsync(other, fn, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
+ {
+ return acceptEitherAsync(other, action, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
+ {
+ return runAfterEitherAsync(other, action, executor);
+ }
+
+ @Override
+ public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
+ {
+ return thenComposeAsync(fn, executor);
+ }
+
+ @Override
+ public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
+ {
+ return whenCompleteAsync(action, executor);
+ }
+
+ @Override
+ public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
+ {
+ return handleAsync(fn, executor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
new file mode 100644
index 0000000..7be7c28
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+import org.apache.curator.test.Timing;
+import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.models.TestModel;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.math.BigInteger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestCachedModeledFramework extends TestModeledFramework
+{
+ @Test
+ public void testThreading()
+ {
+ TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+ CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.listenable().addListener((type, path1, stat, model1) -> latch.countDown());
+
+ complete(client.set(model));
+ client.start();
+ Assert.assertTrue(new Timing().awaitLatch(latch));
+
+ AtomicReference<Thread> completionThread = new AtomicReference<>();
+ complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null)));
+ Assert.assertNotNull(completionThread.get());
+ Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
+ completionThread.set(null);
+
+ complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
+ Assert.assertNotNull(completionThread.get());
+ Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
+ completionThread.set(null);
+ }
+
+ @Test
+ public void testCustomThreading()
+ {
+ AtomicReference<Thread> ourThread = new AtomicReference<>();
+ ExecutorService executor = Executors.newSingleThreadExecutor(r -> {
+ Thread thread = new Thread(r, "testCustomThreading");
+ ourThread.set(thread);
+ return thread;
+ });
+ TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+ CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(executor);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.listenable().addListener((type, path1, stat, model1) -> latch.countDown());
+
+ complete(client.set(model));
+ client.start();
+ Assert.assertTrue(new Timing().awaitLatch(latch));
+
+ AtomicReference<Thread> completionThread = new AtomicReference<>();
+ complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null)));
+ Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
+ completionThread.set(null);
+
+ complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
+ Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
+ completionThread.set(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index b21c8ca..98d5ee1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -45,11 +45,11 @@ import java.util.concurrent.CountDownLatch;
public class TestModeledFramework extends CompletableBaseClassForTests
{
- private static final ZPath path = ZPath.parse("/test/path");
- private CuratorFramework rawClient;
- private ModelSpec<TestModel> modelSpec;
- private ModelSpec<TestNewerModel> newModelSpec;
- private AsyncCuratorFramework async;
+ protected static final ZPath path = ZPath.parse("/test/path");
+ protected CuratorFramework rawClient;
+ protected ModelSpec<TestModel> modelSpec;
+ protected ModelSpec<TestNewerModel> newModelSpec;
+ protected AsyncCuratorFramework async;
@BeforeMethod
@Override