You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/06/28 14:46:36 UTC

curator git commit: Added asyncEnsureContainers

Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 f7d410f8e -> 09f9bc06a


Added asyncEnsureContainers


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/09f9bc06
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/09f9bc06
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/09f9bc06

Branch: refs/heads/CURATOR-397
Commit: 09f9bc06accf7c42d3a5dcf3d92c20d706e56ea6
Parents: f7d410f
Author: randgalt <ra...@apache.org>
Authored: Wed Jun 28 09:46:32 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jun 28 09:46:32 2017 -0500

----------------------------------------------------------------------
 .../org/apache/curator/x/async/AsyncLocker.java | 251 ----------------
 .../apache/curator/x/async/AsyncWrappers.java   | 296 +++++++++++++++++++
 .../apache/curator/x/async/TestAsyncLocker.java |  73 -----
 .../curator/x/async/TestAsyncWrappers.java      |  73 +++++
 4 files changed, 369 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
deleted file mode 100644
index b15fd4b..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async;
-
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.utils.ThreadUtils;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * <p>
- *     Utility for acquiring a lock asynchronously
- * </p>
- *
- * <p>
- *     Canonical usage:
- * <code><pre>
- *     InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock
- *     AsyncLocker.lockAsync(mutex).thenAccept(dummy -> {
- *         try
- *         {
- *             // do work while holding the lock
- *         }
- *         finally
- *         {
- *             AsyncLocker.release(mutex);
- *         }
- *     }).exceptionally(e -> {
- *         if ( e instanceOf TimeoutException ) {
- *             // timed out trying to acquire the lock
- *         }
- *         // handle the error
- *         return null;
- *     });
- * </pre></code>
- * </p>
- */
-public class AsyncLocker
-{
-    /**
-     * Set as the completion stage's exception when trying to acquire a lock
-     * times out
-     */
-    public static class TimeoutException extends RuntimeException
-    {
-    }
-
-    /**
-     * Attempt to acquire the given lock asynchronously using the given timeout and executor. If the lock
-     * is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException}
-     *
-     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
-     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
-     * @param timeout max timeout to acquire lock
-     * @param unit time unit of timeout
-     * @param executor executor to use to asynchronously acquire
-     * @return stage
-     */
-    public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
-    {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        if ( executor == null )
-        {
-            CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit));
-        }
-        else
-        {
-            CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor);
-        }
-        return future;
-    }
-
-    /**
-     * Attempt to acquire the given lock asynchronously using the given timeout and executor. The stage
-     * is completed with a Boolean that indicates whether or not the lock was acquired.
-     *
-     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
-     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
-     * @param timeout max timeout to acquire lock
-     * @param unit time unit of timeout
-     * @param executor executor to use to asynchronously acquire
-     * @return stage
-     */
-    public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
-    {
-        CompletableFuture<Boolean> future = new CompletableFuture<>();
-        if ( executor == null )
-        {
-            CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit));
-        }
-        else
-        {
-            CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit), executor);
-        }
-        return future;
-    }
-
-    /**
-     * Attempt to acquire the given lock asynchronously using the given executor and without a timeout.
-     *
-     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
-     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
-     * @param executor executor to use to asynchronously acquire
-     * @return stage
-     */
-    public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor)
-    {
-        return lockAsync(lock, 0, null, executor);
-    }
-
-    /**
-     * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
-     * If the lock is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException}
-     *
-     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
-     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
-     * @param timeout max timeout to acquire lock
-     * @param unit time unit of timeout
-     * @return stage
-     */
-    public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
-    {
-        return lockAsync(lock, timeout, unit, null);
-    }
-
-    /**
-     * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
-     * The stage is completed with a Boolean that indicates whether or not the lock was acquired.
-     *
-     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
-     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
-     * @param timeout max timeout to acquire lock
-     * @param unit time unit of timeout
-     * @return stage
-     */
-    public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit)
-    {
-        return lockAsyncIf(lock, timeout, unit, null);
-    }
-
-    /**
-     * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
-     *
-     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
-     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
-     * @return stage
-     */
-    public static CompletionStage<Void> lockAsync(InterProcessLock lock)
-    {
-        return lockAsync(lock, 0, null, null);
-    }
-
-    /**
-     * Release the lock and wrap any exception in <code>RuntimeException</code>
-     *
-     * @param lock lock to release
-     */
-    public static void release(InterProcessLock lock)
-    {
-        release(lock, true);
-    }
-
-    /**
-     * Release the lock and wrap any exception in <code>RuntimeException</code>
-     *
-     * @param lock lock to release
-     * @param ignoreNoLockExceptions if true {@link java.lang.IllegalStateException} is ignored
-     */
-    public static void release(InterProcessLock lock, boolean ignoreNoLockExceptions)
-    {
-        try
-        {
-            lock.release();
-        }
-        catch ( IllegalStateException e )
-        {
-            if ( !ignoreNoLockExceptions )
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        catch ( Exception e )
-        {
-            ThreadUtils.checkInterrupted(e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static void lockIf(CompletableFuture<Boolean> future, InterProcessLock lock, long timeout, TimeUnit unit)
-    {
-        try
-        {
-            future.complete(lock.acquire(timeout, unit));
-        }
-        catch ( Exception e )
-        {
-            ThreadUtils.checkInterrupted(e);
-            future.completeExceptionally(e);
-        }
-    }
-
-    private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit)
-    {
-        try
-        {
-            if ( unit != null )
-            {
-                if ( lock.acquire(timeout, unit) )
-                {
-                    future.complete(null);
-                }
-                else
-                {
-                    future.completeExceptionally(new TimeoutException());
-                }
-            }
-            else
-            {
-                lock.acquire();
-                future.complete(null);
-            }
-        }
-        catch ( Exception e )
-        {
-            ThreadUtils.checkInterrupted(e);
-            future.completeExceptionally(e);
-        }
-    }
-
-    private AsyncLocker()
-    {
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
new file mode 100644
index 0000000..8ff507c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.EnsureContainers;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ *     Utility for adding asynchronous behavior
+ * </p>
+ *
+ * <p>
+ *     E.g. locks:
+ * <code><pre>
+ *     InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock
+ *     AsyncWrappers.lockAsync(mutex, executor).thenAccept(dummy -> {
+ *         try
+ *         {
+ *             // do work while holding the lock
+ *         }
+ *         finally
+ *         {
+ *             AsyncWrappers.release(mutex);
+ *         }
+ *     }).exceptionally(e -> {
+ *         if ( e instanceOf TimeoutException ) {
+ *             // timed out trying to acquire the lock
+ *         }
+ *         // handle the error
+ *         return null;
+ *     });
+ * </pre></code>
+ * </p>
+ *
+ * <p>
+ *     E.g. EnsureContainers
+ * <code><pre>
+ *     AsyncWrappers.(client, path, executor).thenAccept(dummy -> {
+ *         // execute after ensuring containers
+ *     });
+ * </pre></code>
+ * </p>
+ */
+public class AsyncWrappers
+{
+    /**
+     * Asynchronously call {@link org.apache.curator.framework.EnsureContainers} using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+     *
+     * @param client client
+     * @param path path to ensure
+     * @return stage
+     */
+    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
+    {
+        return asyncEnsureContainers(client, path, null);
+    }
+
+    /**
+     * Asynchronously call {@link org.apache.curator.framework.EnsureContainers} using the given executor
+     *
+     * @param client client
+     * @param path path to ensure
+     * @return stage
+     */
+    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor)
+    {
+        Runnable proc = () -> {
+            try
+            {
+                new EnsureContainers(client.unwrap(), path.fullPath()).ensure();
+            }
+            catch ( Exception e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+        return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc);
+    }
+
+    /**
+     * Set as the completion stage's exception when trying to acquire a lock
+     * times out
+     */
+    public static class TimeoutException extends RuntimeException
+    {
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given timeout and executor. If the lock
+     * is not acquired within the timeout stage is completedExceptionally with {@link AsyncWrappers.TimeoutException}
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param timeout max timeout to acquire lock
+     * @param unit time unit of timeout
+     * @param executor executor to use to asynchronously acquire
+     * @return stage
+     */
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
+    {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        if ( executor == null )
+        {
+            CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit));
+        }
+        else
+        {
+            CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor);
+        }
+        return future;
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given timeout and executor. The stage
+     * is completed with a Boolean that indicates whether or not the lock was acquired.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param timeout max timeout to acquire lock
+     * @param unit time unit of timeout
+     * @param executor executor to use to asynchronously acquire
+     * @return stage
+     */
+    public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
+    {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        if ( executor == null )
+        {
+            CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit));
+        }
+        else
+        {
+            CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit), executor);
+        }
+        return future;
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given executor and without a timeout.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param executor executor to use to asynchronously acquire
+     * @return stage
+     */
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor)
+    {
+        return lockAsync(lock, 0, null, executor);
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+     * If the lock is not acquired within the timeout stage is completedExceptionally with {@link AsyncWrappers.TimeoutException}
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param timeout max timeout to acquire lock
+     * @param unit time unit of timeout
+     * @return stage
+     */
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
+    {
+        return lockAsync(lock, timeout, unit, null);
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+     * The stage is completed with a Boolean that indicates whether or not the lock was acquired.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param timeout max timeout to acquire lock
+     * @param unit time unit of timeout
+     * @return stage
+     */
+    public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit)
+    {
+        return lockAsyncIf(lock, timeout, unit, null);
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @return stage
+     */
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock)
+    {
+        return lockAsync(lock, 0, null, null);
+    }
+
+    /**
+     * Release the lock and wrap any exception in <code>RuntimeException</code>
+     *
+     * @param lock lock to release
+     */
+    public static void release(InterProcessLock lock)
+    {
+        release(lock, true);
+    }
+
+    /**
+     * Release the lock and wrap any exception in <code>RuntimeException</code>
+     *
+     * @param lock lock to release
+     * @param ignoreNoLockExceptions if true {@link java.lang.IllegalStateException} is ignored
+     */
+    public static void release(InterProcessLock lock, boolean ignoreNoLockExceptions)
+    {
+        try
+        {
+            lock.release();
+        }
+        catch ( IllegalStateException e )
+        {
+            if ( !ignoreNoLockExceptions )
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void lockIf(CompletableFuture<Boolean> future, InterProcessLock lock, long timeout, TimeUnit unit)
+    {
+        try
+        {
+            future.complete(lock.acquire(timeout, unit));
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+            future.completeExceptionally(e);
+        }
+    }
+
+    private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit)
+    {
+        try
+        {
+            if ( unit != null )
+            {
+                if ( lock.acquire(timeout, unit) )
+                {
+                    future.complete(null);
+                }
+                else
+                {
+                    future.completeExceptionally(new TimeoutException());
+                }
+            }
+            else
+            {
+                lock.acquire();
+                future.complete(null);
+            }
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+            future.completeExceptionally(e);
+        }
+    }
+
+    private AsyncWrappers()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
deleted file mode 100644
index 2553620..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.retry.RetryOneTime;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class TestAsyncLocker extends CompletableBaseClassForTests
-{
-    @Test
-    public void testBasic()
-    {
-        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
-        {
-            client.start();
-
-            InterProcessMutex lock = new InterProcessMutex(client, "/one/two");
-            complete(AsyncLocker.lockAsync(lock), (__, e) -> {
-                Assert.assertNull(e);
-                AsyncLocker.release(lock);
-            });
-        }
-    }
-
-    @Test
-    public void testContention() throws Exception
-    {
-        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
-        {
-            client.start();
-
-            InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two");
-            InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two");
-            CountDownLatch latch = new CountDownLatch(1);
-            AsyncLocker.lockAsync(lock1).thenAccept(__ -> {
-                latch.countDown();  // don't release the lock
-            });
-            Assert.assertTrue(timing.awaitLatch(latch));
-
-            CountDownLatch latch2 = new CountDownLatch(1);
-            AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> {
-                if ( e instanceof AsyncLocker.TimeoutException )
-                {
-                    latch2.countDown();  // lock should still be held
-                }
-                return null;
-            });
-            Assert.assertTrue(timing.awaitLatch(latch2));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java
new file mode 100644
index 0000000..7ce7904
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryOneTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TestAsyncWrappers extends CompletableBaseClassForTests
+{
+    @Test
+    public void testBasic()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            InterProcessMutex lock = new InterProcessMutex(client, "/one/two");
+            complete(AsyncWrappers.lockAsync(lock), (__, e) -> {
+                Assert.assertNull(e);
+                AsyncWrappers.release(lock);
+            });
+        }
+    }
+
+    @Test
+    public void testContention() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two");
+            InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two");
+            CountDownLatch latch = new CountDownLatch(1);
+            AsyncWrappers.lockAsync(lock1).thenAccept(__ -> {
+                latch.countDown();  // don't release the lock
+            });
+            Assert.assertTrue(timing.awaitLatch(latch));
+
+            CountDownLatch latch2 = new CountDownLatch(1);
+            AsyncWrappers.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> {
+                if ( e instanceof AsyncWrappers.TimeoutException )
+                {
+                    latch2.countDown();  // lock should still be held
+                }
+                return null;
+            });
+            Assert.assertTrue(timing.awaitLatch(latch2));
+        }
+    }
+}