You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/06/11 03:22:13 UTC

curator git commit: Made the API of AsyncLocker cleaner

Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 e943763f0 -> fbf25adcf


Made the API of AsyncLocker cleaner


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fbf25adc
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fbf25adc
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fbf25adc

Branch: refs/heads/CURATOR-397
Commit: fbf25adcf88b4198fdecd18a494b615c70bf763c
Parents: e943763
Author: randgalt <ra...@apache.org>
Authored: Sat Jun 10 22:22:08 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jun 10 22:22:08 2017 -0500

----------------------------------------------------------------------
 .../org/apache/curator/x/async/AsyncLocker.java | 124 ++++++++-----------
 .../apache/curator/x/async/TestAsyncLocker.java |  17 ++-
 2 files changed, 60 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fbf25adc/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
index 8b65a4a..42f7041 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
@@ -34,22 +34,21 @@ import java.util.concurrent.TimeUnit;
  *     Canonical usage:
  * <code><pre>
  *     InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock
- *     AsyncLocker.lockAsync(mutex).handle((state, e) -> {
- *         if ( e != null )
+ *     AsyncLocker.lockAsync(mutex).thenAccept(dummy -> {
+ *         try
  *         {
- *             // handle the error
+ *             // do work while holding the lock
  *         }
- *         else if ( state.hasTheLock() )
+ *         finally
  *         {
- *             try
- *             {
- *                 // do work while holding the lock
- *             }
- *             finally
- *             {
- *                 state.release();
- *             }
+ *             AsyncLocker.release(mutex);
  *         }
+ *     }).exceptionally(e -> {
+ *         if ( e instanceOf TimeoutException ) {
+ *             // timed out trying to acquire the lock
+ *         }
+ *         // handle the error
+ *         return null;
  *     });
  * </pre></code>
  * </p>
@@ -57,23 +56,11 @@ import java.util.concurrent.TimeUnit;
 public class AsyncLocker
 {
     /**
-     * State of the lock
+     * Set as the completion stage's exception when trying to acquire a lock
+     * times out
      */
-    public interface LockState
+    public static class TimeoutException extends RuntimeException
     {
-        /**
-         * Returns true if you own the lock
-         *
-         * @return true/false
-         */
-        boolean hasTheLock();
-
-        /**
-         * Safe release of the lock. Only tries to release
-         * if you own the lock. The lock ownership is changed
-         * to <code>false</code> by this method.
-         */
-        void release();
     }
 
     /**
@@ -86,13 +73,18 @@ public class AsyncLocker
      * @param executor executor to use to asynchronously acquire
      * @return stage
      */
-    public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
     {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if ( executor == null )
         {
-            return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit));
+            CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit));
         }
-        return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit), executor);
+        else
+        {
+            CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor);
+        }
+        return future;
     }
 
     /**
@@ -103,7 +95,7 @@ public class AsyncLocker
      * @param executor executor to use to asynchronously acquire
      * @return stage
      */
-    public static CompletionStage<LockState> lockAsync(InterProcessLock lock, Executor executor)
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor)
     {
         return lockAsync(lock, 0, null, executor);
     }
@@ -117,7 +109,7 @@ public class AsyncLocker
      * @param unit time unit of timeout
      * @return stage
      */
-    public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
     {
         return lockAsync(lock, timeout, unit, null);
     }
@@ -129,23 +121,21 @@ public class AsyncLocker
      * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
      * @return stage
      */
-    public static CompletionStage<LockState> lockAsync(InterProcessLock lock)
+    public static CompletionStage<Void> lockAsync(InterProcessLock lock)
     {
         return lockAsync(lock, 0, null, null);
     }
 
-    private static LockState lock(InterProcessLock lock, long timeout, TimeUnit unit)
+    /**
+     * Release the lock and wrap any exception in <code>RuntimeException</code>
+     *
+     * @param lock lock to release
+     */
+    public static void release(InterProcessLock lock)
     {
         try
         {
-            if ( unit != null )
-            {
-                boolean hasTheLock = lock.acquire(timeout, unit);
-                return new InternalLockState(lock, hasTheLock);
-            }
-
-            lock.acquire();
-            return new InternalLockState(lock, true);
+            lock.release();
         }
         catch ( Exception e )
         {
@@ -154,43 +144,35 @@ public class AsyncLocker
         }
     }
 
-    private AsyncLocker()
-    {
-    }
-
-    private static class InternalLockState implements LockState
+    private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit)
     {
-        private final InterProcessLock lock;
-        private volatile boolean hasTheLock;
-
-        public InternalLockState(InterProcessLock lock, boolean hasTheLock)
-        {
-            this.lock = lock;
-            this.hasTheLock = hasTheLock;
-        }
-
-        @Override
-        public boolean hasTheLock()
-        {
-            return hasTheLock;
-        }
-
-        @Override
-        public void release()
+        try
         {
-            if ( hasTheLock )
+            if ( unit != null )
             {
-                hasTheLock = false;
-                try
+                if ( lock.acquire(timeout, unit) )
                 {
-                    lock.release();
+                    future.complete(null);
                 }
-                catch ( Exception e )
+                else
                 {
-                    ThreadUtils.checkInterrupted(e);
-                    throw new RuntimeException(e);
+                    future.completeExceptionally(new TimeoutException());
                 }
             }
+            else
+            {
+                lock.acquire();
+                future.complete(null);
+            }
         }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+            future.completeExceptionally(e);
+        }
+    }
+
+    private AsyncLocker()
+    {
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/fbf25adc/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
index 7ea2d08..2553620 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
@@ -37,10 +37,9 @@ public class TestAsyncLocker extends CompletableBaseClassForTests
             client.start();
 
             InterProcessMutex lock = new InterProcessMutex(client, "/one/two");
-            complete(AsyncLocker.lockAsync(lock), (state, e) -> {
+            complete(AsyncLocker.lockAsync(lock), (__, e) -> {
                 Assert.assertNull(e);
-                Assert.assertTrue(state.hasTheLock());
-                state.release();
+                AsyncLocker.release(lock);
             });
         }
     }
@@ -55,20 +54,18 @@ public class TestAsyncLocker extends CompletableBaseClassForTests
             InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two");
             InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two");
             CountDownLatch latch = new CountDownLatch(1);
-            AsyncLocker.lockAsync(lock1).thenAccept(state -> {
-                if ( state.hasTheLock() )
-                {
-                    latch.countDown();  // don't release the lock
-                }
+            AsyncLocker.lockAsync(lock1).thenAccept(__ -> {
+                latch.countDown();  // don't release the lock
             });
             Assert.assertTrue(timing.awaitLatch(latch));
 
             CountDownLatch latch2 = new CountDownLatch(1);
-            AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).thenAccept(state -> {
-                if ( !state.hasTheLock() )
+            AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> {
+                if ( e instanceof AsyncLocker.TimeoutException )
                 {
                     latch2.countDown();  // lock should still be held
                 }
+                return null;
             });
             Assert.assertTrue(timing.awaitLatch(latch2));
         }