You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:16:39 UTC

[17/23] curator git commit: more tests

more tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9385d049
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9385d049
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9385d049

Branch: refs/heads/master
Commit: 9385d0490d8c684a602b4c57489e39941a9a178b
Parents: 75118e4
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:34:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:34:00 2017 -0500

----------------------------------------------------------------------
 .../async/migrations/TestMigrationManager.java  | 88 +++++++++++++++++++-
 1 file changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9385d049/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 80a03bb..786e704 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncWrappers;
 import org.apache.curator.x.async.CompletableBaseClassForTests;
 import org.apache.curator.x.async.migrations.models.ModelV1;
 import org.apache.curator.x.async.migrations.models.ModelV2;
@@ -41,12 +42,20 @@ import org.testng.annotations.Test;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TestMigrationManager extends CompletableBaseClassForTests
 {
+    private static final String LOCK_PATH = "/migrations/locks";
+    private static final String META_DATA_PATH = "/migrations/metadata";
     private AsyncCuratorFramework client;
     private ModelSpec<ModelV1> v1Spec;
     private ModelSpec<ModelV2> v2Spec;
@@ -57,6 +66,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     private CuratorOp v2op;
     private CuratorOp v3op;
     private MigrationManager manager;
+    private final AtomicReference<CountDownLatch> filterLatch = new AtomicReference<>();
 
     @BeforeMethod
     @Override
@@ -81,7 +91,27 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
 
         executor = Executors.newCachedThreadPool();
-        manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
+        manager = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMinutes(10))
+        {
+            @Override
+            protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException
+            {
+                CountDownLatch localLatch = filterLatch.getAndSet(null);
+                if ( localLatch != null )
+                {
+                    try
+                    {
+                        localLatch.await();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        Thread.currentThread().interrupt();
+                        Throwables.propagate(e);
+                    }
+                }
+                return super.filter(set, operationHashesInOrder);
+            }
+        };
         manager.debugCount = new AtomicInteger();
     }
 
@@ -260,4 +290,60 @@ public class TestMigrationManager extends CompletableBaseClassForTests
 
         Assert.assertNull(client.unwrap().checkExists().forPath("/test"));
     }
+
+    @Test
+    public void testConcurrency1() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test");
+        CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+        Migration migration = () -> Arrays.asList(op1, op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        CountDownLatch latch = new CountDownLatch(1);
+        filterLatch.set(latch);
+        CompletionStage<Void> first = manager.migrate(migrationSet);
+
+        MigrationManager manager2 = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMillis(timing.forSleepingABit().milliseconds()));
+        try
+        {
+            complete(manager2.migrate(migrationSet));
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof AsyncWrappers.TimeoutException);
+        }
+
+        latch.countDown();
+        complete(first);
+        Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+    }
+
+    @Test
+    public void testConcurrency2() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test");
+        CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+        Migration migration = () -> Arrays.asList(op1, op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        CountDownLatch latch = new CountDownLatch(1);
+        filterLatch.set(latch);
+        CompletionStage<Void> first = manager.migrate(migrationSet);
+
+        CompletionStage<Void> second = manager.migrate(migrationSet);
+        try
+        {
+            second.toCompletableFuture().get(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS);
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof TimeoutException);
+        }
+
+        latch.countDown();
+        complete(first);
+        Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+        complete(second);
+        Assert.assertEquals(manager.debugCount.get(), 1);
+    }
 }