You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:16:39 UTC
[17/23] curator git commit: more tests
more tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9385d049
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9385d049
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9385d049
Branch: refs/heads/master
Commit: 9385d0490d8c684a602b4c57489e39941a9a178b
Parents: 75118e4
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:34:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:34:00 2017 -0500
----------------------------------------------------------------------
.../async/migrations/TestMigrationManager.java | 88 +++++++++++++++++++-
1 file changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9385d049/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 80a03bb..786e704 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncWrappers;
import org.apache.curator.x.async.CompletableBaseClassForTests;
import org.apache.curator.x.async.migrations.models.ModelV1;
import org.apache.curator.x.async.migrations.models.ModelV2;
@@ -41,12 +42,20 @@ import org.testng.annotations.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class TestMigrationManager extends CompletableBaseClassForTests
{
+ private static final String LOCK_PATH = "/migrations/locks";
+ private static final String META_DATA_PATH = "/migrations/metadata";
private AsyncCuratorFramework client;
private ModelSpec<ModelV1> v1Spec;
private ModelSpec<ModelV2> v2Spec;
@@ -57,6 +66,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
private CuratorOp v2op;
private CuratorOp v3op;
private MigrationManager manager;
+ private final AtomicReference<CountDownLatch> filterLatch = new AtomicReference<>();
@BeforeMethod
@Override
@@ -81,7 +91,27 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
executor = Executors.newCachedThreadPool();
- manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
+ manager = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMinutes(10))
+ {
+ @Override
+ protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException
+ {
+ CountDownLatch localLatch = filterLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ try
+ {
+ localLatch.await();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ Throwables.propagate(e);
+ }
+ }
+ return super.filter(set, operationHashesInOrder);
+ }
+ };
manager.debugCount = new AtomicInteger();
}
@@ -260,4 +290,60 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertNull(client.unwrap().checkExists().forPath("/test"));
}
+
+ @Test
+ public void testConcurrency1() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test");
+ CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+ Migration migration = () -> Arrays.asList(op1, op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ CountDownLatch latch = new CountDownLatch(1);
+ filterLatch.set(latch);
+ CompletionStage<Void> first = manager.migrate(migrationSet);
+
+ MigrationManager manager2 = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMillis(timing.forSleepingABit().milliseconds()));
+ try
+ {
+ complete(manager2.migrate(migrationSet));
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof AsyncWrappers.TimeoutException);
+ }
+
+ latch.countDown();
+ complete(first);
+ Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+ }
+
+ @Test
+ public void testConcurrency2() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test");
+ CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+ Migration migration = () -> Arrays.asList(op1, op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ CountDownLatch latch = new CountDownLatch(1);
+ filterLatch.set(latch);
+ CompletionStage<Void> first = manager.migrate(migrationSet);
+
+ CompletionStage<Void> second = manager.migrate(migrationSet);
+ try
+ {
+ second.toCompletableFuture().get(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS);
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof TimeoutException);
+ }
+
+ latch.countDown();
+ complete(first);
+ Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+ complete(second);
+ Assert.assertEquals(manager.debugCount.get(), 1);
+ }
}