You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/13 23:10:05 UTC

[10/12] incubator-geode git commit: Make AsyncInvocation implement Future

Make AsyncInvocation implement Future


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/aece5de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/aece5de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/aece5de3

Branch: refs/heads/feature/GEODE-17-2
Commit: aece5de3ce5f6078dd28d1e73e577751c672364a
Parents: c0651a4
Author: Kirk Lund <kl...@apache.org>
Authored: Tue Apr 12 17:33:47 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Tue Apr 12 17:33:47 2016 -0700

----------------------------------------------------------------------
 .../gemfire/test/dunit/AsyncInvocation.java     | 187 ++++++++++++++-----
 1 file changed, 140 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aece5de3/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
index 441c48c..a9427d3 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/AsyncInvocation.java
@@ -17,6 +17,10 @@
 package com.gemstone.gemfire.test.dunit;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -44,19 +48,20 @@ import com.gemstone.gemfire.SystemFailure;
  *   }
  * </pre>
  *
+ * @param <V> The result type returned by this AsyncInvocation's {@code get} methods
  * @see VM#invokeAsync(Class, String)
  */
-public class AsyncInvocation<T> {
-  // TODO: davidw Add the ability to get a return value back from the
+public class AsyncInvocation<V> implements Future<V> {
+  // TODO:davidw: Add the ability to get a return value back from the
   // async method call.  (Use a static ThreadLocal field that is
   // accessible from the Runnable used in VM#invoke)
-  // TODO: reimplement using Futures
+  // TODO:?: reimplement using Futures
 
   private static final long DEFAULT_JOIN_MILLIS = 60 * 1000;
 
   private final Thread thread;
 
-  private final AtomicReference<T> resultValue = new AtomicReference<>();
+  private final AtomicReference<V> resultValue = new AtomicReference<>();
 
   /** An exception thrown while this {@code AsyncInvocation} ran */
   private final AtomicReference<Throwable> resultThrowable = new AtomicReference<>();
@@ -66,6 +71,9 @@ public class AsyncInvocation<T> {
 
   /** The name of the method being invoked */
   private String methodName;
+
+  /** True if this {@code AsyncInvocation} has been cancelled */
+  private boolean cancelled;
   
   /**
    * Creates a new {@code AsyncInvocation}.
@@ -78,7 +86,7 @@ public class AsyncInvocation<T> {
    * @param  work
    *         The actual invocation of the method
    */
-  public AsyncInvocation(final Object target, final String methodName, final Callable<T> work) {
+  public AsyncInvocation(final Object target, final String methodName, final Callable<V> work) {
     this.target = target;
     this.methodName = methodName;
     this.thread = new Thread(new AsyncInvocationGroup(), runnable(work), getName(target, methodName));
@@ -86,6 +94,8 @@ public class AsyncInvocation<T> {
 
   /**
    * Returns the target of this async method invocation.
+   *
+   * @deprecated This method is not required for anything.
    */
   public Object getTarget() {
     return this.target;
@@ -93,6 +103,8 @@ public class AsyncInvocation<T> {
 
   /**
    * Returns the name of the method being invoked remotely.
+   *
+   * @deprecated This method is not required for anything.
    */
   public String getMethodName() {
     return this.methodName;
@@ -115,9 +127,13 @@ public class AsyncInvocation<T> {
    * @throws AssertionError if this {@code AsyncInvocation} is not done.
    */
   public Throwable getException() {
-    checkIsDone("Exception status not available while thread is alive.");
+    try {
+      checkIsDone("Exception status not available while thread is alive.");
+    } catch (IllegalStateException illegalStateException) {
+      throw new AssertionError(illegalStateException);
+    }
 
-    if (this.resultThrowable.get() instanceof RMIException) {
+    if (this.resultThrowable.get() instanceof RMIException) { // TODO:klund: delete our RMIException
       return this.resultThrowable.get().getCause();
 
     } else {
@@ -134,8 +150,8 @@ public class AsyncInvocation<T> {
    * @throws AssertionError wrapping any {@code Exception} thrown by this
    *         {@code AsyncInvocation}.
    */
-  public AsyncInvocation<T> checkException() {
-    if (exceptionOccurred()) {
+  public AsyncInvocation<V> checkException() {
+    if (this.resultThrowable.get() != null) {
       throw new AssertionError("An exception occurred during asynchronous invocation.", getException());
     }
     return this;
@@ -154,11 +170,14 @@ public class AsyncInvocation<T> {
    *         timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
    *
    * @throws InterruptedException if the current thread is interrupted.
+   *
+   * @deprecated Please use {@link #get()} instead.
    */
-  public T getResult() throws InterruptedException {
+  public V getResult() throws InterruptedException {
     join();
     checkException();
-    return getReturnValue();
+    checkIsDone("Return value not available while thread is alive.");
+    return this.resultValue.get();
   }
 
   /**
@@ -177,12 +196,17 @@ public class AsyncInvocation<T> {
    *         timeout of {@code millis}.
    *
    * @throws InterruptedException if the current thread is interrupted.
+   *
+   * @deprecated Please use {@link #get(long, TimeUnit)} instead.
    */
-  public T getResult(final long millis) throws InterruptedException {
-    join(millis);
-    timeoutIfAlive(millis);
-    checkException();
-    return getReturnValue();
+  public V getResult(final long millis) throws InterruptedException {
+    try {
+      return get(millis, TimeUnit.MILLISECONDS);
+    } catch (ExecutionException executionException) {
+      throw new AssertionError(executionException);
+    } catch (TimeoutException timeoutException) {
+      throw new AssertionError(timeoutException);
+    }
   }
 
   /**
@@ -192,9 +216,9 @@ public class AsyncInvocation<T> {
    *
    * @throws AssertionError if this {@code AsyncInvocation} is not done.
    *
-   * @deprecated Please use {@link #getResult()} instead.
+   * @deprecated Please use {@link #get()} instead.
    */
-  public T getReturnValue() {
+  public V getReturnValue() {
     checkIsDone("Return value not available while thread is alive.");
     return this.resultValue.get();
   }
@@ -209,18 +233,13 @@ public class AsyncInvocation<T> {
    *
    * @return this {@code AsyncInvocation}
    *
-//   * @throws AssertionError wrapping a {@code TimeoutException} if this
-//   *         {@code AsyncInvocation} fails to complete within the specified
-//   *         timeout of {@code millis}.
-//   *
    * @throws IllegalArgumentException if the value of {@code millis} is
    *         negative.
    *
    * @throws InterruptedException if the current thread is interrupted.
    */
-  public synchronized AsyncInvocation<T> join(final long millis) throws InterruptedException {
+  public synchronized AsyncInvocation<V> join(final long millis) throws InterruptedException {
     this.thread.join(millis);
-//    timeoutIfAlive(millis);
     return this;
   }
 
@@ -235,19 +254,14 @@ public class AsyncInvocation<T> {
    *
    * @return this {@code AsyncInvocation}
    *
-//   * @throws AssertionError wrapping a {@code TimeoutException} if this
-//   *         {@code AsyncInvocation} fails to complete within the specified
-//   *         timeout of {@code millis}.
-//   *
    * @throws IllegalArgumentException
    *         if the value of {@code millis} is negative, or the value
    *         of {@code nanos} is not in the range {@code 0-999999}.
    *
    * @throws InterruptedException if the current thread is interrupted.
    */
-  public synchronized AsyncInvocation<T> join(final long millis, final int nanos) throws InterruptedException {
+  public synchronized AsyncInvocation<V> join(final long millis, final int nanos) throws InterruptedException {
     this.thread.join(millis, nanos);
-//    timeoutIfAlive(millis);
     return this;
   }
 
@@ -257,16 +271,11 @@ public class AsyncInvocation<T> {
    *
    * @return this {@code AsyncInvocation}
    *
-//   * @throws AssertionError wrapping a {@code TimeoutException} if this
-//   *         {@code AsyncInvocation} fails to complete within the default
-//   *         timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
-//   *
    * @throws InterruptedException if the current thread is interrupted.
    */
-  public AsyncInvocation<T> join() throws InterruptedException {
+  public AsyncInvocation<V> join() throws InterruptedException {
     // do NOT invoke Thread#join() without a timeout
     join(DEFAULT_JOIN_MILLIS);
-//    timeoutIfAlive(DEFAULT_JOIN_MILLIS);
     return this;
   }
 
@@ -275,7 +284,7 @@ public class AsyncInvocation<T> {
    *
    * @return this {@code AsyncInvocation}
    */
-  public synchronized AsyncInvocation<T> start() {
+  public synchronized AsyncInvocation<V> start() {
     this.thread.start();
     return this;
   }
@@ -300,6 +309,89 @@ public class AsyncInvocation<T> {
     return this.thread.isAlive();
   }
 
+  @Override
+  public synchronized boolean isCancelled() {
+    return this.cancelled;
+  }
+
+  @Override
+  public synchronized boolean isDone() {
+    return !this.thread.isAlive(); //state != NEW;
+  }
+
+  @Override
+  public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+    if (this.thread.isAlive()) {
+      if (mayInterruptIfRunning) {
+        this.cancelled = true;
+        this.thread.interrupt();
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Waits if necessary for the work to complete, and then returns the result
+   * of this {@code AsyncInvocation}.
+   *
+   * @return the result of this {@code AsyncInvocation}
+   *
+   * @throws AssertionError wrapping any {@code Exception} thrown by this
+   *         {@code AsyncInvocation}.
+   *
+   * @throws AssertionError wrapping a {@code TimeoutException} if this
+   *         {@code AsyncInvocation} fails to complete within the default
+   *         timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
+   *
+   * @throws CancellationException if the computation was cancelled
+   *
+   * @throws ExecutionException if the computation threw an exception
+   *
+   * @throws InterruptedException if the current thread is interrupted.
+   */
+  @Override
+  public V get() throws ExecutionException, InterruptedException {
+    try {
+      return get(DEFAULT_JOIN_MILLIS, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException timeoutException) {
+      throw new AssertionError(timeoutException);
+    }
+  }
+
+  /**
+   * Waits if necessary for at most the given time for the computation
+   * to complete, and then retrieves its result, if available.
+   *
+   * @param  timeout the maximum time to wait
+   * @param  unit the time unit of the timeout argument
+   *
+   * @return the result of this {@code AsyncInvocation}
+   *
+   * @throws AssertionError wrapping any {@code Exception} thrown by this
+   *         {@code AsyncInvocation}.
+   *
+   * @throws AssertionError wrapping a {@code TimeoutException} if this
+   *         {@code AsyncInvocation} fails to complete within the default
+   *         timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
+   *
+   * @throws CancellationException if the computation was cancelled
+   *
+   * @throws ExecutionException if the computation threw an exception
+   *
+   * @throws InterruptedException if the current thread is interrupted.
+   *
+   * @throws TimeoutException if the wait timed out
+   */
+  @Override
+  public V get(final long timeout, final TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
+    long millis = unit.toMillis(timeout);
+    join(millis);
+    timeoutIfAlive(millis);
+    checkException();
+    return this.resultValue.get();
+  }
+
   /**
    * Returns the identifier of this {@code AsyncInvocation}'s thread. The
    * thread ID is a positive <tt>long</tt> number generated when this thread
@@ -318,18 +410,19 @@ public class AsyncInvocation<T> {
   }
 
   /**
-   * Throws {@code AssertionError} if this {@code AsyncInvocation} is not done.
+   * Throws {@code IllegalStateException} if this {@code AsyncInvocation} is
+   * not done.
    *
    * @param  message
    *         The value to be used in constructing detail message
    *
    * @return this {@code AsyncInvocation}
    *
-   * @throws AssertionError if this {@code AsyncInvocation} is not done.
+   * @throws IllegalStateException if this {@code AsyncInvocation} is not done.
    */
-  private AsyncInvocation<T> checkIsDone(final String message) {
+  private AsyncInvocation<V> checkIsDone(final String message) {
     if (this.thread.isAlive()) {
-      throw new AssertionError(message);
+      throw new IllegalStateException(message);
     }
     return this;
   }
@@ -341,18 +434,18 @@ public class AsyncInvocation<T> {
    *
    * @return this {@code AsyncInvocation}
    *
-   * @throws AssertionError wrapping a {@code TimeoutException} if this
-   *         {@code AsyncInvocation} fails to complete within the default
-   *         timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
+   * @throws TimeoutException if this {@code AsyncInvocation} fails to complete
+   *         within the default timeout of 60 seconds as defined by
+   *         {@link #DEFAULT_JOIN_MILLIS}.
    */
-  private AsyncInvocation<T> timeoutIfAlive(final long timeout) {
+  private AsyncInvocation<V> timeoutIfAlive(final long timeout) throws TimeoutException {
     if (this.thread.isAlive()) {
-      throw new AssertionError(new TimeoutException("Timed out waiting " + timeout + " milliseconds for AsyncInvocation to complete."));
+      throw new TimeoutException("Timed out waiting " + timeout + " milliseconds for AsyncInvocation to complete.");
     }
     return this;
   }
 
-  private Runnable runnable(final Callable<T> work) {
+  private Runnable runnable(final Callable<V> work) {
     return () -> {
         try {
           resultValue.set(work.call());