You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/06/11 03:22:13 UTC
curator git commit: Made the API of AsyncLocker cleaner
Repository: curator
Updated Branches:
refs/heads/CURATOR-397 e943763f0 -> fbf25adcf
Made the API of AsyncLocker cleaner
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fbf25adc
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fbf25adc
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fbf25adc
Branch: refs/heads/CURATOR-397
Commit: fbf25adcf88b4198fdecd18a494b615c70bf763c
Parents: e943763
Author: randgalt <ra...@apache.org>
Authored: Sat Jun 10 22:22:08 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jun 10 22:22:08 2017 -0500
----------------------------------------------------------------------
.../org/apache/curator/x/async/AsyncLocker.java | 124 ++++++++-----------
.../apache/curator/x/async/TestAsyncLocker.java | 17 ++-
2 files changed, 60 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fbf25adc/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
index 8b65a4a..42f7041 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
@@ -34,22 +34,21 @@ import java.util.concurrent.TimeUnit;
* Canonical usage:
* <code><pre>
* InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock
- * AsyncLocker.lockAsync(mutex).handle((state, e) -> {
- * if ( e != null )
+ * AsyncLocker.lockAsync(mutex).thenAccept(dummy -> {
+ * try
* {
- * // handle the error
+ * // do work while holding the lock
* }
- * else if ( state.hasTheLock() )
+ * finally
* {
- * try
- * {
- * // do work while holding the lock
- * }
- * finally
- * {
- * state.release();
- * }
+ * AsyncLocker.release(mutex);
* }
+ * }).exceptionally(e -> {
+ * if ( e instanceOf TimeoutException ) {
+ * // timed out trying to acquire the lock
+ * }
+ * // handle the error
+ * return null;
* });
* </pre></code>
* </p>
@@ -57,23 +56,11 @@ import java.util.concurrent.TimeUnit;
public class AsyncLocker
{
/**
- * State of the lock
+ * Set as the completion stage's exception when trying to acquire a lock
+ * times out
*/
- public interface LockState
+ public static class TimeoutException extends RuntimeException
{
- /**
- * Returns true if you own the lock
- *
- * @return true/false
- */
- boolean hasTheLock();
-
- /**
- * Safe release of the lock. Only tries to release
- * if you own the lock. The lock ownership is changed
- * to <code>false</code> by this method.
- */
- void release();
}
/**
@@ -86,13 +73,18 @@ public class AsyncLocker
* @param executor executor to use to asynchronously acquire
* @return stage
*/
- public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
{
+ CompletableFuture<Void> future = new CompletableFuture<>();
if ( executor == null )
{
- return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit));
+ CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit));
}
- return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit), executor);
+ else
+ {
+ CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor);
+ }
+ return future;
}
/**
@@ -103,7 +95,7 @@ public class AsyncLocker
* @param executor executor to use to asynchronously acquire
* @return stage
*/
- public static CompletionStage<LockState> lockAsync(InterProcessLock lock, Executor executor)
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor)
{
return lockAsync(lock, 0, null, executor);
}
@@ -117,7 +109,7 @@ public class AsyncLocker
* @param unit time unit of timeout
* @return stage
*/
- public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
{
return lockAsync(lock, timeout, unit, null);
}
@@ -129,23 +121,21 @@ public class AsyncLocker
* {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
* @return stage
*/
- public static CompletionStage<LockState> lockAsync(InterProcessLock lock)
+ public static CompletionStage<Void> lockAsync(InterProcessLock lock)
{
return lockAsync(lock, 0, null, null);
}
- private static LockState lock(InterProcessLock lock, long timeout, TimeUnit unit)
+ /**
+ * Release the lock and wrap any exception in <code>RuntimeException</code>
+ *
+ * @param lock lock to release
+ */
+ public static void release(InterProcessLock lock)
{
try
{
- if ( unit != null )
- {
- boolean hasTheLock = lock.acquire(timeout, unit);
- return new InternalLockState(lock, hasTheLock);
- }
-
- lock.acquire();
- return new InternalLockState(lock, true);
+ lock.release();
}
catch ( Exception e )
{
@@ -154,43 +144,35 @@ public class AsyncLocker
}
}
- private AsyncLocker()
- {
- }
-
- private static class InternalLockState implements LockState
+ private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit)
{
- private final InterProcessLock lock;
- private volatile boolean hasTheLock;
-
- public InternalLockState(InterProcessLock lock, boolean hasTheLock)
- {
- this.lock = lock;
- this.hasTheLock = hasTheLock;
- }
-
- @Override
- public boolean hasTheLock()
- {
- return hasTheLock;
- }
-
- @Override
- public void release()
+ try
{
- if ( hasTheLock )
+ if ( unit != null )
{
- hasTheLock = false;
- try
+ if ( lock.acquire(timeout, unit) )
{
- lock.release();
+ future.complete(null);
}
- catch ( Exception e )
+ else
{
- ThreadUtils.checkInterrupted(e);
- throw new RuntimeException(e);
+ future.completeExceptionally(new TimeoutException());
}
}
+ else
+ {
+ lock.acquire();
+ future.complete(null);
+ }
}
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ future.completeExceptionally(e);
+ }
+ }
+
+ private AsyncLocker()
+ {
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/fbf25adc/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
index 7ea2d08..2553620 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
@@ -37,10 +37,9 @@ public class TestAsyncLocker extends CompletableBaseClassForTests
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/one/two");
- complete(AsyncLocker.lockAsync(lock), (state, e) -> {
+ complete(AsyncLocker.lockAsync(lock), (__, e) -> {
Assert.assertNull(e);
- Assert.assertTrue(state.hasTheLock());
- state.release();
+ AsyncLocker.release(lock);
});
}
}
@@ -55,20 +54,18 @@ public class TestAsyncLocker extends CompletableBaseClassForTests
InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two");
InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two");
CountDownLatch latch = new CountDownLatch(1);
- AsyncLocker.lockAsync(lock1).thenAccept(state -> {
- if ( state.hasTheLock() )
- {
- latch.countDown(); // don't release the lock
- }
+ AsyncLocker.lockAsync(lock1).thenAccept(__ -> {
+ latch.countDown(); // don't release the lock
});
Assert.assertTrue(timing.awaitLatch(latch));
CountDownLatch latch2 = new CountDownLatch(1);
- AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).thenAccept(state -> {
- if ( !state.hasTheLock() )
+ AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> {
+ if ( e instanceof AsyncLocker.TimeoutException )
{
latch2.countDown(); // lock should still be held
}
+ return null;
});
Assert.assertTrue(timing.awaitLatch(latch2));
}