You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/10 21:38:14 UTC
curator git commit: More tests, refactoring
Repository: curator
Updated Branches:
refs/heads/CURATOR-397 6485f1650 -> 396d98a51
More tests, refactoring
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/396d98a5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/396d98a5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/396d98a5
Branch: refs/heads/CURATOR-397
Commit: 396d98a51495ec1c60156dcd0d6d553644e43689
Parents: 6485f16
Author: randgalt <ra...@apache.org>
Authored: Wed May 10 23:38:07 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed May 10 23:38:07 2017 +0200
----------------------------------------------------------------------
.../modeled/cached/ModeledCacheListener.java | 7 +-
.../modeled/TestCachedModeledFramework.java | 122 +++++++++++++++----
.../x/async/modeled/TestModeledFramework.java | 37 +-----
.../async/modeled/TestModeledFrameworkBase.java | 64 ++++++++++
4 files changed, 167 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
index 4f1ac70..42498c0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java
@@ -40,12 +40,7 @@ public interface ModeledCacheListener<T>
/**
* A child was removed from the path
*/
- NODE_REMOVED,
-
- /**
- * Signals that the initial cache has been populated.
- */
- INITIALIZED
+ NODE_REMOVED
}
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 7be7c28..a9048de 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -18,18 +18,22 @@
*/
package org.apache.curator.x.async.modeled;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.Timing;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
+import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.curator.x.async.modeled.models.TestModel;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.io.IOException;
import java.math.BigInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
-public class TestCachedModeledFramework extends TestModeledFramework
+public class TestCachedModeledFramework extends TestModeledFrameworkBase
{
@Test
public void testThreading()
@@ -42,18 +46,25 @@ public class TestCachedModeledFramework extends TestModeledFramework
complete(client.set(model));
client.start();
- Assert.assertTrue(new Timing().awaitLatch(latch));
-
- AtomicReference<Thread> completionThread = new AtomicReference<>();
- complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null)));
- Assert.assertNotNull(completionThread.get());
- Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
- completionThread.set(null);
-
- complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
- Assert.assertNotNull(completionThread.get());
- Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
- completionThread.set(null);
+ try
+ {
+ Assert.assertTrue(new Timing().awaitLatch(latch));
+
+ AtomicReference<Thread> completionThread = new AtomicReference<>();
+ complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null)));
+ Assert.assertNotNull(completionThread.get());
+ Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
+ completionThread.set(null);
+
+ complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
+ Assert.assertNotNull(completionThread.get());
+ Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads");
+ completionThread.set(null);
+ }
+ finally
+ {
+ client.close();
+ }
}
@Test
@@ -73,15 +84,84 @@ public class TestCachedModeledFramework extends TestModeledFramework
complete(client.set(model));
client.start();
- Assert.assertTrue(new Timing().awaitLatch(latch));
+ try
+ {
+ Assert.assertTrue(new Timing().awaitLatch(latch));
+
+ AtomicReference<Thread> completionThread = new AtomicReference<>();
+ complete(client.read().thenAccept(s -> completionThread.set(Thread.currentThread())));
+ Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
+ completionThread.set(null);
+
+ complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
+ Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
+ completionThread.set(null);
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testDownServer() throws IOException
+ {
+ Timing timing = new Timing();
+
+ TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+ CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached();
+ Semaphore semaphore = new Semaphore(0);
+ client.listenable().addListener((t, p, s, m) -> semaphore.release());
- AtomicReference<Thread> completionThread = new AtomicReference<>();
- complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null)));
- Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
- completionThread.set(null);
+ client.start();
+ try
+ {
+ client.set(model);
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ rawClient.getConnectionStateListenable().addListener((__, state) -> {
+ if ( state == ConnectionState.LOST )
+ {
+ latch.countDown();
+ }
+ });
+ server.stop();
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ complete(client.read().whenComplete((value, e) -> {
+ Assert.assertNotNull(value);
+ Assert.assertNull(e);
+ }));
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testPostInitializedFilter()
+ {
+ TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+ TestModel model2 = new TestModel("d", "e", "e", 1, BigInteger.ONE);
+ CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached();
+ Semaphore semaphore = new Semaphore(0);
+ ModeledCacheListener<TestModel> listener = (t, p, s, m) -> semaphore.release();
+ client.listenable().addListener(listener.postInitializedOnly());
+
+ complete(client.at("1").set(model1)); // set before cache is started
+ client.start();
+ try
+ {
+ Assert.assertFalse(timing.forSleepingABit().acquireSemaphore(semaphore));
- complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null)));
- Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread");
- completionThread.set(null);
+ client.at("2").set(model2); // set before cache is started
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ }
+ finally
+ {
+ client.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index 98d5ee1..4a08a2b 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -25,57 +25,22 @@ import org.apache.curator.framework.schema.Schema;
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.schema.SchemaViolation;
import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.AsyncStage;
-import org.apache.curator.x.async.CompletableBaseClassForTests;
import org.apache.curator.x.async.modeled.models.TestModel;
import org.apache.curator.x.async.modeled.models.TestNewerModel;
import org.apache.curator.x.async.modeled.versioned.Versioned;
import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-public class TestModeledFramework extends CompletableBaseClassForTests
+public class TestModeledFramework extends TestModeledFrameworkBase
{
- protected static final ZPath path = ZPath.parse("/test/path");
- protected CuratorFramework rawClient;
- protected ModelSpec<TestModel> modelSpec;
- protected ModelSpec<TestNewerModel> newModelSpec;
- protected AsyncCuratorFramework async;
-
- @BeforeMethod
- @Override
- public void setup() throws Exception
- {
- super.setup();
-
- rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- rawClient.start();
- async = AsyncCuratorFramework.wrap(rawClient);
-
- JacksonModelSerializer<TestModel> serializer = JacksonModelSerializer.build(TestModel.class);
- JacksonModelSerializer<TestNewerModel> newSerializer = JacksonModelSerializer.build(TestNewerModel.class);
-
- modelSpec = ModelSpec.builder(path, serializer).build();
- newModelSpec = ModelSpec.builder(path, newSerializer).build();
- }
-
- @AfterMethod
- @Override
- public void teardown() throws Exception
- {
- CloseableUtils.closeQuietly(rawClient);
- super.teardown();
- }
-
@Test
public void testCrud()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
new file mode 100644
index 0000000..61a4570
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.models.TestModel;
+import org.apache.curator.x.async.modeled.models.TestNewerModel;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+public class TestModeledFrameworkBase extends CompletableBaseClassForTests
+{
+ protected static final ZPath path = ZPath.parse("/test/path");
+ protected CuratorFramework rawClient;
+ protected ModelSpec<TestModel> modelSpec;
+ protected ModelSpec<TestNewerModel> newModelSpec;
+ protected AsyncCuratorFramework async;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception
+ {
+ super.setup();
+
+ rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ rawClient.start();
+ async = AsyncCuratorFramework.wrap(rawClient);
+
+ JacksonModelSerializer<TestModel> serializer = JacksonModelSerializer.build(TestModel.class);
+ JacksonModelSerializer<TestNewerModel> newSerializer = JacksonModelSerializer.build(TestNewerModel.class);
+
+ modelSpec = ModelSpec.builder(path, serializer).build();
+ newModelSpec = ModelSpec.builder(path, newSerializer).build();
+ }
+
+ @AfterMethod
+ @Override
+ public void teardown() throws Exception
+ {
+ CloseableUtils.closeQuietly(rawClient);
+ super.teardown();
+ }
+}