You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/13 23:10:05 UTC
[10/12] incubator-geode git commit: Make AsyncInvocation implement
Future
Make AsyncInvocation implement Future
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/aece5de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/aece5de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/aece5de3
Branch: refs/heads/feature/GEODE-17-2
Commit: aece5de3ce5f6078dd28d1e73e577751c672364a
Parents: c0651a4
Author: Kirk Lund <kl...@apache.org>
Authored: Tue Apr 12 17:33:47 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Tue Apr 12 17:33:47 2016 -0700
----------------------------------------------------------------------
.../gemfire/test/dunit/AsyncInvocation.java | 187 ++++++++++++++-----
1 file changed, 140 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aece5de3/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
index 441c48c..a9427d3 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
@@ -17,6 +17,10 @@
package com.gemstone.gemfire.test.dunit;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -44,19 +48,20 @@ import com.gemstone.gemfire.SystemFailure;
* }
* </pre>
*
+ * @param <V> The result type returned by this AsyncInvocation's {@code get} methods
* @see VM#invokeAsync(Class, String)
*/
-public class AsyncInvocation<T> {
- // TODO: davidw Add the ability to get a return value back from the
+public class AsyncInvocation<V> implements Future<V> {
+ // TODO:davidw: Add the ability to get a return value back from the
// async method call. (Use a static ThreadLocal field that is
// accessible from the Runnable used in VM#invoke)
- // TODO: reimplement using Futures
+ // TODO:?: reimplement using Futures
private static final long DEFAULT_JOIN_MILLIS = 60 * 1000;
private final Thread thread;
- private final AtomicReference<T> resultValue = new AtomicReference<>();
+ private final AtomicReference<V> resultValue = new AtomicReference<>();
/** An exception thrown while this {@code AsyncInvocation} ran */
private final AtomicReference<Throwable> resultThrowable = new AtomicReference<>();
@@ -66,6 +71,9 @@ public class AsyncInvocation<T> {
/** The name of the method being invoked */
private String methodName;
+
+ /** True if this {@code AsyncInvocation} has been cancelled */
+ private boolean cancelled;
/**
* Creates a new {@code AsyncInvocation}.
@@ -78,7 +86,7 @@ public class AsyncInvocation<T> {
* @param work
* The actual invocation of the method
*/
- public AsyncInvocation(final Object target, final String methodName, final Callable<T> work) {
+ public AsyncInvocation(final Object target, final String methodName, final Callable<V> work) {
this.target = target;
this.methodName = methodName;
this.thread = new Thread(new AsyncInvocationGroup(), runnable(work), getName(target, methodName));
@@ -86,6 +94,8 @@ public class AsyncInvocation<T> {
/**
* Returns the target of this async method invocation.
+ *
+ * @deprecated This method is not required for anything.
*/
public Object getTarget() {
return this.target;
@@ -93,6 +103,8 @@ public class AsyncInvocation<T> {
/**
* Returns the name of the method being invoked remotely.
+ *
+ * @deprecated This method is not required for anything.
*/
public String getMethodName() {
return this.methodName;
@@ -115,9 +127,13 @@ public class AsyncInvocation<T> {
* @throws AssertionError if this {@code AsyncInvocation} is not done.
*/
public Throwable getException() {
- checkIsDone("Exception status not available while thread is alive.");
+ try {
+ checkIsDone("Exception status not available while thread is alive.");
+ } catch (IllegalStateException illegalStateException) {
+ throw new AssertionError(illegalStateException);
+ }
- if (this.resultThrowable.get() instanceof RMIException) {
+ if (this.resultThrowable.get() instanceof RMIException) { // TODO:klund: delete our RMIException
return this.resultThrowable.get().getCause();
} else {
@@ -134,8 +150,8 @@ public class AsyncInvocation<T> {
* @throws AssertionError wrapping any {@code Exception} thrown by this
* {@code AsyncInvocation}.
*/
- public AsyncInvocation<T> checkException() {
- if (exceptionOccurred()) {
+ public AsyncInvocation<V> checkException() {
+ if (this.resultThrowable.get() != null) {
throw new AssertionError("An exception occurred during asynchronous invocation.", getException());
}
return this;
@@ -154,11 +170,14 @@ public class AsyncInvocation<T> {
* timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
*
* @throws InterruptedException if the current thread is interrupted.
+ *
+ * @deprecated Please use {@link #get()} instead.
*/
- public T getResult() throws InterruptedException {
+ public V getResult() throws InterruptedException {
join();
checkException();
- return getReturnValue();
+ checkIsDone("Return value not available while thread is alive.");
+ return this.resultValue.get();
}
/**
@@ -177,12 +196,17 @@ public class AsyncInvocation<T> {
* timeout of {@code millis}.
*
* @throws InterruptedException if the current thread is interrupted.
+ *
+ * @deprecated Please use {@link #get(long, TimeUnit)} instead.
*/
- public T getResult(final long millis) throws InterruptedException {
- join(millis);
- timeoutIfAlive(millis);
- checkException();
- return getReturnValue();
+ public V getResult(final long millis) throws InterruptedException {
+ try {
+ return get(millis, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException executionException) {
+ throw new AssertionError(executionException);
+ } catch (TimeoutException timeoutException) {
+ throw new AssertionError(timeoutException);
+ }
}
/**
@@ -192,9 +216,9 @@ public class AsyncInvocation<T> {
*
* @throws AssertionError if this {@code AsyncInvocation} is not done.
*
- * @deprecated Please use {@link #getResult()} instead.
+ * @deprecated Please use {@link #get()} instead.
*/
- public T getReturnValue() {
+ public V getReturnValue() {
checkIsDone("Return value not available while thread is alive.");
return this.resultValue.get();
}
@@ -209,18 +233,13 @@ public class AsyncInvocation<T> {
*
* @return this {@code AsyncInvocation}
*
-// * @throws AssertionError wrapping a {@code TimeoutException} if this
-// * {@code AsyncInvocation} fails to complete within the specified
-// * timeout of {@code millis}.
-// *
* @throws IllegalArgumentException if the value of {@code millis} is
* negative.
*
* @throws InterruptedException if the current thread is interrupted.
*/
- public synchronized AsyncInvocation<T> join(final long millis) throws InterruptedException {
+ public synchronized AsyncInvocation<V> join(final long millis) throws InterruptedException {
this.thread.join(millis);
-// timeoutIfAlive(millis);
return this;
}
@@ -235,19 +254,14 @@ public class AsyncInvocation<T> {
*
* @return this {@code AsyncInvocation}
*
-// * @throws AssertionError wrapping a {@code TimeoutException} if this
-// * {@code AsyncInvocation} fails to complete within the specified
-// * timeout of {@code millis}.
-// *
* @throws IllegalArgumentException
* if the value of {@code millis} is negative, or the value
* of {@code nanos} is not in the range {@code 0-999999}.
*
* @throws InterruptedException if the current thread is interrupted.
*/
- public synchronized AsyncInvocation<T> join(final long millis, final int nanos) throws InterruptedException {
+ public synchronized AsyncInvocation<V> join(final long millis, final int nanos) throws InterruptedException {
this.thread.join(millis, nanos);
-// timeoutIfAlive(millis);
return this;
}
@@ -257,16 +271,11 @@ public class AsyncInvocation<T> {
*
* @return this {@code AsyncInvocation}
*
-// * @throws AssertionError wrapping a {@code TimeoutException} if this
-// * {@code AsyncInvocation} fails to complete within the default
-// * timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
-// *
* @throws InterruptedException if the current thread is interrupted.
*/
- public AsyncInvocation<T> join() throws InterruptedException {
+ public AsyncInvocation<V> join() throws InterruptedException {
// do NOT invoke Thread#join() without a timeout
join(DEFAULT_JOIN_MILLIS);
-// timeoutIfAlive(DEFAULT_JOIN_MILLIS);
return this;
}
@@ -275,7 +284,7 @@ public class AsyncInvocation<T> {
*
* @return this {@code AsyncInvocation}
*/
- public synchronized AsyncInvocation<T> start() {
+ public synchronized AsyncInvocation<V> start() {
this.thread.start();
return this;
}
@@ -300,6 +309,89 @@ public class AsyncInvocation<T> {
return this.thread.isAlive();
}
+ @Override
+ public synchronized boolean isCancelled() {
+ return this.cancelled;
+ }
+
+ @Override
+ public synchronized boolean isDone() {
+ return !this.thread.isAlive(); //state != NEW;
+ }
+
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (this.thread.isAlive()) {
+ if (mayInterruptIfRunning) {
+ this.cancelled = true;
+ this.thread.interrupt();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Waits if necessary for the work to complete, and then returns the result
+ * of this {@code AsyncInvocation}.
+ *
+ * @return the result of this {@code AsyncInvocation}
+ *
+ * @throws AssertionError wrapping any {@code Exception} thrown by this
+ * {@code AsyncInvocation}.
+ *
+ * @throws AssertionError wrapping a {@code TimeoutException} if this
+ * {@code AsyncInvocation} fails to complete within the default
+ * timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
+ *
+ * @throws CancellationException if the computation was cancelled
+ *
+ * @throws ExecutionException if the computation threw an exception
+ *
+ * @throws InterruptedException if the current thread is interrupted.
+ */
+ @Override
+ public V get() throws ExecutionException, InterruptedException {
+ try {
+ return get(DEFAULT_JOIN_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException timeoutException) {
+ throw new AssertionError(timeoutException);
+ }
+ }
+
+ /**
+ * Waits if necessary for at most the given time for the computation
+ * to complete, and then retrieves its result, if available.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ *
+ * @return the result of this {@code AsyncInvocation}
+ *
+ * @throws AssertionError wrapping any {@code Exception} thrown by this
+ * {@code AsyncInvocation}.
+ *
+ * @throws AssertionError wrapping a {@code TimeoutException} if this
+ * {@code AsyncInvocation} fails to complete within the default
+ * timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
+ *
+ * @throws CancellationException if the computation was cancelled
+ *
+ * @throws ExecutionException if the computation threw an exception
+ *
+ * @throws InterruptedException if the current thread is interrupted.
+ *
+ * @throws TimeoutException if the wait timed out
+ */
+ @Override
+ public V get(final long timeout, final TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
+ long millis = unit.toMillis(timeout);
+ join(millis);
+ timeoutIfAlive(millis);
+ checkException();
+ return this.resultValue.get();
+ }
+
/**
* Returns the identifier of this {@code AsyncInvocation}'s thread. The
* thread ID is a positive <tt>long</tt> number generated when this thread
@@ -318,18 +410,19 @@ public class AsyncInvocation<T> {
}
/**
- * Throws {@code AssertionError} if this {@code AsyncInvocation} is not done.
+ * Throws {@code IllegalStateException} if this {@code AsyncInvocation} is
+ * not done.
*
* @param message
* The value to be used in constructing detail message
*
* @return this {@code AsyncInvocation}
*
- * @throws AssertionError if this {@code AsyncInvocation} is not done.
+ * @throws IllegalStateException if this {@code AsyncInvocation} is not done.
*/
- private AsyncInvocation<T> checkIsDone(final String message) {
+ private AsyncInvocation<V> checkIsDone(final String message) {
if (this.thread.isAlive()) {
- throw new AssertionError(message);
+ throw new IllegalStateException(message);
}
return this;
}
@@ -341,18 +434,18 @@ public class AsyncInvocation<T> {
*
* @return this {@code AsyncInvocation}
*
- * @throws AssertionError wrapping a {@code TimeoutException} if this
- * {@code AsyncInvocation} fails to complete within the default
- * timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
+ * @throws TimeoutException if this {@code AsyncInvocation} fails to complete
+ * within the default timeout of 60 seconds as defined by
+ * {@link #DEFAULT_JOIN_MILLIS}.
*/
- private AsyncInvocation<T> timeoutIfAlive(final long timeout) {
+ private AsyncInvocation<V> timeoutIfAlive(final long timeout) throws TimeoutException {
if (this.thread.isAlive()) {
- throw new AssertionError(new TimeoutException("Timed out waiting " + timeout + " milliseconds for AsyncInvocation to complete."));
+ throw new TimeoutException("Timed out waiting " + timeout + " milliseconds for AsyncInvocation to complete.");
}
return this;
}
- private Runnable runnable(final Callable<T> work) {
+ private Runnable runnable(final Callable<V> work) {
return () -> {
try {
resultValue.set(work.call());