You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/06/28 14:46:36 UTC
curator git commit: Added asyncEnsureContainers
Repository: curator
Updated Branches:
refs/heads/CURATOR-397 f7d410f8e -> 09f9bc06a
Added asyncEnsureContainers
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/09f9bc06
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/09f9bc06
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/09f9bc06
Branch: refs/heads/CURATOR-397
Commit: 09f9bc06accf7c42d3a5dcf3d92c20d706e56ea6
Parents: f7d410f
Author: randgalt <ra...@apache.org>
Authored: Wed Jun 28 09:46:32 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jun 28 09:46:32 2017 -0500
----------------------------------------------------------------------
.../org/apache/curator/x/async/AsyncLocker.java | 251 ----------------
.../apache/curator/x/async/AsyncWrappers.java | 296 +++++++++++++++++++
.../apache/curator/x/async/TestAsyncLocker.java | 73 -----
.../curator/x/async/TestAsyncWrappers.java | 73 +++++
4 files changed, 369 insertions(+), 324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
deleted file mode 100644
index b15fd4b..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async;
-
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.utils.ThreadUtils;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * <p>
- * Utility for acquiring a lock asynchronously
- * </p>
- *
- * <p>
- * Canonical usage:
- * <code><pre>
- * InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock
- * AsyncLocker.lockAsync(mutex).thenAccept(dummy -> {
- * try
- * {
- * // do work while holding the lock
- * }
- * finally
- * {
- * AsyncLocker.release(mutex);
- * }
- * }).exceptionally(e -> {
- * if ( e instanceOf TimeoutException ) {
- * // timed out trying to acquire the lock
- * }
- * // handle the error
- * return null;
- * });
- * </pre></code>
- * </p>
- */
-public class AsyncLocker
-{
- /**
- * Set as the completion stage's exception when trying to acquire a lock
- * times out
- */
- public static class TimeoutException extends RuntimeException
- {
- }
-
- /**
- * Attempt to acquire the given lock asynchronously using the given timeout and executor. If the lock
- * is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException}
- *
- * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
- * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
- * @param timeout max timeout to acquire lock
- * @param unit time unit of timeout
- * @param executor executor to use to asynchronously acquire
- * @return stage
- */
- public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
- {
- CompletableFuture<Void> future = new CompletableFuture<>();
- if ( executor == null )
- {
- CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit));
- }
- else
- {
- CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor);
- }
- return future;
- }
-
- /**
- * Attempt to acquire the given lock asynchronously using the given timeout and executor. The stage
- * is completed with a Boolean that indicates whether or not the lock was acquired.
- *
- * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
- * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
- * @param timeout max timeout to acquire lock
- * @param unit time unit of timeout
- * @param executor executor to use to asynchronously acquire
- * @return stage
- */
- public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
- {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- if ( executor == null )
- {
- CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit));
- }
- else
- {
- CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit), executor);
- }
- return future;
- }
-
- /**
- * Attempt to acquire the given lock asynchronously using the given executor and without a timeout.
- *
- * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
- * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
- * @param executor executor to use to asynchronously acquire
- * @return stage
- */
- public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor)
- {
- return lockAsync(lock, 0, null, executor);
- }
-
- /**
- * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
- * If the lock is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException}
- *
- * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
- * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
- * @param timeout max timeout to acquire lock
- * @param unit time unit of timeout
- * @return stage
- */
- public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
- {
- return lockAsync(lock, timeout, unit, null);
- }
-
- /**
- * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
- * The stage is completed with a Boolean that indicates whether or not the lock was acquired.
- *
- * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
- * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
- * @param timeout max timeout to acquire lock
- * @param unit time unit of timeout
- * @return stage
- */
- public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit)
- {
- return lockAsyncIf(lock, timeout, unit, null);
- }
-
- /**
- * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
- *
- * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
- * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
- * @return stage
- */
- public static CompletionStage<Void> lockAsync(InterProcessLock lock)
- {
- return lockAsync(lock, 0, null, null);
- }
-
- /**
- * Release the lock and wrap any exception in <code>RuntimeException</code>
- *
- * @param lock lock to release
- */
- public static void release(InterProcessLock lock)
- {
- release(lock, true);
- }
-
- /**
- * Release the lock and wrap any exception in <code>RuntimeException</code>
- *
- * @param lock lock to release
- * @param ignoreNoLockExceptions if true {@link java.lang.IllegalStateException} is ignored
- */
- public static void release(InterProcessLock lock, boolean ignoreNoLockExceptions)
- {
- try
- {
- lock.release();
- }
- catch ( IllegalStateException e )
- {
- if ( !ignoreNoLockExceptions )
- {
- throw new RuntimeException(e);
- }
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- throw new RuntimeException(e);
- }
- }
-
- private static void lockIf(CompletableFuture<Boolean> future, InterProcessLock lock, long timeout, TimeUnit unit)
- {
- try
- {
- future.complete(lock.acquire(timeout, unit));
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- future.completeExceptionally(e);
- }
- }
-
- private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit)
- {
- try
- {
- if ( unit != null )
- {
- if ( lock.acquire(timeout, unit) )
- {
- future.complete(null);
- }
- else
- {
- future.completeExceptionally(new TimeoutException());
- }
- }
- else
- {
- lock.acquire();
- future.complete(null);
- }
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- future.completeExceptionally(e);
- }
- }
-
- private AsyncLocker()
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
new file mode 100644
index 0000000..8ff507c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.EnsureContainers;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Utility for adding asynchronous behavior
+ * </p>
+ *
+ * <p>
+ * E.g. locks:
+ * <code><pre>
+ * InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock
+ * AsyncWrappers.lockAsync(mutex, executor).thenAccept(dummy -> {
+ * try
+ * {
+ * // do work while holding the lock
+ * }
+ * finally
+ * {
+ * AsyncWrappers.release(mutex);
+ * }
+ * }).exceptionally(e -> {
+ * if ( e instanceOf TimeoutException ) {
+ * // timed out trying to acquire the lock
+ * }
+ * // handle the error
+ * return null;
+ * });
+ * </pre></code>
+ * </p>
+ *
+ * <p>
+ * E.g. EnsureContainers
+ * <code><pre>
+ * AsyncWrappers.(client, path, executor).thenAccept(dummy -> {
+ * // execute after ensuring containers
+ * });
+ * </pre></code>
+ * </p>
+ */
+public class AsyncWrappers
+{
+ /**
+ * Asynchronously call {@link org.apache.curator.framework.EnsureContainers} using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+ *
+ * @param client client
+ * @param path path to ensure
+ * @return stage
+ */
+ public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
+ {
+ return asyncEnsureContainers(client, path, null);
+ }
+
+ /**
+ * Asynchronously call {@link org.apache.curator.framework.EnsureContainers} using the given executor
+ *
+ * @param client client
+ * @param path path to ensure
+ * @return stage
+ */
+ public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor)
+ {
+ Runnable proc = () -> {
+ try
+ {
+ new EnsureContainers(client.unwrap(), path.fullPath()).ensure();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ };
+ return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc);
+ }
+
+ /**
+ * Set as the completion stage's exception when trying to acquire a lock
+ * times out
+ */
+ public static class TimeoutException extends RuntimeException
+ {
+ }
+
+ /**
+ * Attempt to acquire the given lock asynchronously using the given timeout and executor. If the lock
+ * is not acquired within the timeout stage is completedExceptionally with {@link AsyncWrappers.TimeoutException}
+ *
+ * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+ * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+ * @param timeout max timeout to acquire lock
+ * @param unit time unit of timeout
+ * @param executor executor to use to asynchronously acquire
+ * @return stage
+ */
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
+ {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if ( executor == null )
+ {
+ CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit));
+ }
+ else
+ {
+ CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor);
+ }
+ return future;
+ }
+
+ /**
+ * Attempt to acquire the given lock asynchronously using the given timeout and executor. The stage
+ * is completed with a Boolean that indicates whether or not the lock was acquired.
+ *
+ * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+ * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+ * @param timeout max timeout to acquire lock
+ * @param unit time unit of timeout
+ * @param executor executor to use to asynchronously acquire
+ * @return stage
+ */
+ public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
+ {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ if ( executor == null )
+ {
+ CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit));
+ }
+ else
+ {
+ CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit), executor);
+ }
+ return future;
+ }
+
+ /**
+ * Attempt to acquire the given lock asynchronously using the given executor and without a timeout.
+ *
+ * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+ * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+ * @param executor executor to use to asynchronously acquire
+ * @return stage
+ */
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor)
+ {
+ return lockAsync(lock, 0, null, executor);
+ }
+
+ /**
+ * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+ * If the lock is not acquired within the timeout stage is completedExceptionally with {@link AsyncWrappers.TimeoutException}
+ *
+ * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+ * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+ * @param timeout max timeout to acquire lock
+ * @param unit time unit of timeout
+ * @return stage
+ */
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
+ {
+ return lockAsync(lock, timeout, unit, null);
+ }
+
+ /**
+ * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+ * The stage is completed with a Boolean that indicates whether or not the lock was acquired.
+ *
+ * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+ * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+ * @param timeout max timeout to acquire lock
+ * @param unit time unit of timeout
+ * @return stage
+ */
+ public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit)
+ {
+ return lockAsyncIf(lock, timeout, unit, null);
+ }
+
+ /**
+ * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+ *
+ * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+ * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+ * @return stage
+ */
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock)
+ {
+ return lockAsync(lock, 0, null, null);
+ }
+
+ /**
+ * Release the lock and wrap any exception in <code>RuntimeException</code>
+ *
+ * @param lock lock to release
+ */
+ public static void release(InterProcessLock lock)
+ {
+ release(lock, true);
+ }
+
+ /**
+ * Release the lock and wrap any exception in <code>RuntimeException</code>
+ *
+ * @param lock lock to release
+ * @param ignoreNoLockExceptions if true {@link java.lang.IllegalStateException} is ignored
+ */
+ public static void release(InterProcessLock lock, boolean ignoreNoLockExceptions)
+ {
+ try
+ {
+ lock.release();
+ }
+ catch ( IllegalStateException e )
+ {
+ if ( !ignoreNoLockExceptions )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void lockIf(CompletableFuture<Boolean> future, InterProcessLock lock, long timeout, TimeUnit unit)
+ {
+ try
+ {
+ future.complete(lock.acquire(timeout, unit));
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ future.completeExceptionally(e);
+ }
+ }
+
+ private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit)
+ {
+ try
+ {
+ if ( unit != null )
+ {
+ if ( lock.acquire(timeout, unit) )
+ {
+ future.complete(null);
+ }
+ else
+ {
+ future.completeExceptionally(new TimeoutException());
+ }
+ }
+ else
+ {
+ lock.acquire();
+ future.complete(null);
+ }
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ future.completeExceptionally(e);
+ }
+ }
+
+ private AsyncWrappers()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
deleted file mode 100644
index 2553620..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.retry.RetryOneTime;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class TestAsyncLocker extends CompletableBaseClassForTests
-{
- @Test
- public void testBasic()
- {
- try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
- {
- client.start();
-
- InterProcessMutex lock = new InterProcessMutex(client, "/one/two");
- complete(AsyncLocker.lockAsync(lock), (__, e) -> {
- Assert.assertNull(e);
- AsyncLocker.release(lock);
- });
- }
- }
-
- @Test
- public void testContention() throws Exception
- {
- try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
- {
- client.start();
-
- InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two");
- InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two");
- CountDownLatch latch = new CountDownLatch(1);
- AsyncLocker.lockAsync(lock1).thenAccept(__ -> {
- latch.countDown(); // don't release the lock
- });
- Assert.assertTrue(timing.awaitLatch(latch));
-
- CountDownLatch latch2 = new CountDownLatch(1);
- AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> {
- if ( e instanceof AsyncLocker.TimeoutException )
- {
- latch2.countDown(); // lock should still be held
- }
- return null;
- });
- Assert.assertTrue(timing.awaitLatch(latch2));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java
new file mode 100644
index 0000000..7ce7904
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryOneTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TestAsyncWrappers extends CompletableBaseClassForTests
+{
+ @Test
+ public void testBasic()
+ {
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ InterProcessMutex lock = new InterProcessMutex(client, "/one/two");
+ complete(AsyncWrappers.lockAsync(lock), (__, e) -> {
+ Assert.assertNull(e);
+ AsyncWrappers.release(lock);
+ });
+ }
+ }
+
+ @Test
+ public void testContention() throws Exception
+ {
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two");
+ InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two");
+ CountDownLatch latch = new CountDownLatch(1);
+ AsyncWrappers.lockAsync(lock1).thenAccept(__ -> {
+ latch.countDown(); // don't release the lock
+ });
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ CountDownLatch latch2 = new CountDownLatch(1);
+ AsyncWrappers.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> {
+ if ( e instanceof AsyncWrappers.TimeoutException )
+ {
+ latch2.countDown(); // lock should still be held
+ }
+ return null;
+ });
+ Assert.assertTrue(timing.awaitLatch(latch2));
+ }
+ }
+}