You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/11 20:44:47 UTC

[1/2] beam git commit: This closes #2493

Repository: beam
Updated Branches:
  refs/heads/master 7c169a614 -> 7587d29f5


This closes #2493


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7587d29f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7587d29f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7587d29f

Branch: refs/heads/master
Commit: 7587d29f50654397f00a754596bfc5a225d4a64f
Parents: 7c169a6 db81205
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 13:44:34 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 11 13:44:34 2017 -0700

----------------------------------------------------------------------
 .../direct/TransformExecutorServices.java       | 37 +++++++++++----
 .../direct/TransformExecutorServicesTest.java   | 48 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Improve Work Rejection handling

Posted by tg...@apache.org.
Improve Work Rejection handling

The timing between checking for a shutdown state and submitting work is
racy. Re-check to see if a work rejection is acceptable because the
underlying executor is shut down before throwing a
RejectedExecutionException.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db81205a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db81205a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db81205a

Branch: refs/heads/master
Commit: db81205af9b38245ff85ce2801af433cc31bce42
Parents: 7c169a6
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 11:33:53 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 11 13:44:34 2017 -0700

----------------------------------------------------------------------
 .../direct/TransformExecutorServices.java       | 37 +++++++++++----
 .../direct/TransformExecutorServicesTest.java   | 48 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/db81205a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 6733758..53087bf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -21,8 +21,11 @@ import com.google.common.base.MoreObjects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Static factory methods for constructing instances of {@link TransformExecutorService}.
@@ -37,7 +40,7 @@ final class TransformExecutorServices {
    * parallel.
    */
   public static TransformExecutorService parallel(ExecutorService executor) {
-    return new ParallelEvaluationState(executor);
+    return new ParallelTransformExecutor(executor);
   }
 
   /**
@@ -45,7 +48,7 @@ final class TransformExecutorServices {
    * serial.
    */
   public static TransformExecutorService serial(ExecutorService executor) {
-    return new SerialEvaluationState(executor);
+    return new SerialTransformExecutor(executor);
   }
 
   /**
@@ -55,18 +58,36 @@ final class TransformExecutorServices {
    * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
    * processed in parallel.
    */
-  private static class ParallelEvaluationState implements TransformExecutorService {
+  private static class ParallelTransformExecutor implements TransformExecutorService {
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelTransformExecutor.class);
+
     private final ExecutorService executor;
     private final AtomicBoolean active = new AtomicBoolean(true);
 
-    private ParallelEvaluationState(ExecutorService executor) {
+    private ParallelTransformExecutor(ExecutorService executor) {
       this.executor = executor;
     }
 
     @Override
     public void schedule(TransformExecutor<?> work) {
       if (active.get()) {
-        executor.submit(work);
+        try {
+          executor.submit(work);
+        } catch (RejectedExecutionException rejected) {
+          boolean stillActive = active.get();
+          if (stillActive) {
+            throw new IllegalStateException(
+                String.format(
+                    "Execution of Work %s was rejected, but the %s is still active",
+                    work, ParallelTransformExecutor.class.getSimpleName()));
+          } else {
+            LOG.debug(
+                "Rejected execution of Work {} on executor {}. "
+                    + "Suppressed exception because evaluator is not active",
+                work,
+                this);
+          }
+        }
       }
     }
 
@@ -88,14 +109,14 @@ final class TransformExecutorServices {
    * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
    * Keyed computations are processed serially per step.
    */
-  private static class SerialEvaluationState implements TransformExecutorService {
+  private static class SerialTransformExecutor implements TransformExecutorService {
     private final ExecutorService executor;
 
     private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
     private final Queue<TransformExecutor<?>> workQueue;
     private boolean active = true;
 
-    private SerialEvaluationState(ExecutorService executor) {
+    private SerialTransformExecutor(ExecutorService executor) {
       this.executor = executor;
       this.currentlyEvaluating = new AtomicReference<>();
       this.workQueue = new ConcurrentLinkedQueue<>();
@@ -149,7 +170,7 @@ final class TransformExecutorServices {
 
     @Override
     public String toString() {
-      return MoreObjects.toStringHelper(SerialEvaluationState.class)
+      return MoreObjects.toStringHelper(SerialTransformExecutor.class)
           .add("currentlyEvaluating", currentlyEvaluating)
           .add("workQueue", workQueue)
           .toString();

http://git-wip-us.apache.org/repos/asf/beam/blob/db81205a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
index b085723..77652b2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
@@ -64,6 +64,31 @@ public class TransformExecutorServicesTest {
   }
 
   @Test
+  public void parallelRejectedStillActiveThrows() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+
+    TransformExecutorService parallel =
+        TransformExecutorServices.parallel(executorService);
+    executorService.shutdown();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("still active");
+    parallel.schedule(first);
+  }
+
+  @Test
+  public void parallelRejectedShutdownSucceeds() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+
+    TransformExecutorService parallel =
+        TransformExecutorServices.parallel(executorService);
+    executorService.shutdown();
+    parallel.shutdown();
+    parallel.schedule(first);
+  }
+
+  @Test
   public void serialScheduleTwoWaitsForFirstToComplete() {
     @SuppressWarnings("unchecked")
     TransformExecutor<Object> first = mock(TransformExecutor.class);
@@ -97,4 +122,27 @@ public class TransformExecutorServicesTest {
 
     serial.complete(second);
   }
+
+  /**
+   * Tests that a Serial {@link TransformExecutorService} does not schedule follow up work if the
+   * executor is shut down when the initial work completes.
+   */
+  @Test
+  public void serialShutdownCompleteActive() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+    TransformExecutorService serial = TransformExecutorServices.serial(executorService);
+    serial.schedule(first);
+    verify(first).run();
+
+    serial.schedule(second);
+    verify(second, never()).run();
+
+    serial.shutdown();
+    serial.complete(first);
+    verify(second, never()).run();
+  }
 }