You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/11 20:44:47 UTC
[1/2] beam git commit: This closes #2493
Repository: beam
Updated Branches:
refs/heads/master 7c169a614 -> 7587d29f5
This closes #2493
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7587d29f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7587d29f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7587d29f
Branch: refs/heads/master
Commit: 7587d29f50654397f00a754596bfc5a225d4a64f
Parents: 7c169a6 db81205
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 13:44:34 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 11 13:44:34 2017 -0700
----------------------------------------------------------------------
.../direct/TransformExecutorServices.java | 37 +++++++++++----
.../direct/TransformExecutorServicesTest.java | 48 ++++++++++++++++++++
2 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Improve Work Rejection handling
Posted by tg...@apache.org.
Improve Work Rejection handling
The timing between checking for a shutdown state and submitting work is
racy. Re-check to see if a work rejection is acceptable because the
underlying executor is shut down before throwing a
RejectedExecutionException.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db81205a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db81205a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db81205a
Branch: refs/heads/master
Commit: db81205af9b38245ff85ce2801af433cc31bce42
Parents: 7c169a6
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 11:33:53 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 11 13:44:34 2017 -0700
----------------------------------------------------------------------
.../direct/TransformExecutorServices.java | 37 +++++++++++----
.../direct/TransformExecutorServicesTest.java | 48 ++++++++++++++++++++
2 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/db81205a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 6733758..53087bf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -21,8 +21,11 @@ import com.google.common.base.MoreObjects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Static factory methods for constructing instances of {@link TransformExecutorService}.
@@ -37,7 +40,7 @@ final class TransformExecutorServices {
* parallel.
*/
public static TransformExecutorService parallel(ExecutorService executor) {
- return new ParallelEvaluationState(executor);
+ return new ParallelTransformExecutor(executor);
}
/**
@@ -45,7 +48,7 @@ final class TransformExecutorServices {
* serial.
*/
public static TransformExecutorService serial(ExecutorService executor) {
- return new SerialEvaluationState(executor);
+ return new SerialTransformExecutor(executor);
}
/**
@@ -55,18 +58,36 @@ final class TransformExecutorServices {
* <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
* processed in parallel.
*/
- private static class ParallelEvaluationState implements TransformExecutorService {
+ private static class ParallelTransformExecutor implements TransformExecutorService {
+ private static final Logger LOG = LoggerFactory.getLogger(ParallelTransformExecutor.class);
+
private final ExecutorService executor;
private final AtomicBoolean active = new AtomicBoolean(true);
- private ParallelEvaluationState(ExecutorService executor) {
+ private ParallelTransformExecutor(ExecutorService executor) {
this.executor = executor;
}
@Override
public void schedule(TransformExecutor<?> work) {
if (active.get()) {
- executor.submit(work);
+ try {
+ executor.submit(work);
+ } catch (RejectedExecutionException rejected) {
+ boolean stillActive = active.get();
+ if (stillActive) {
+ throw new IllegalStateException(
+ String.format(
+ "Execution of Work %s was rejected, but the %s is still active",
+ work, ParallelTransformExecutor.class.getSimpleName()));
+ } else {
+ LOG.debug(
+ "Rejected execution of Work {} on executor {}. "
+ + "Suppressed exception because evaluator is not active",
+ work,
+ this);
+ }
+ }
}
}
@@ -88,14 +109,14 @@ final class TransformExecutorServices {
* <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
* Keyed computations are processed serially per step.
*/
- private static class SerialEvaluationState implements TransformExecutorService {
+ private static class SerialTransformExecutor implements TransformExecutorService {
private final ExecutorService executor;
private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
private final Queue<TransformExecutor<?>> workQueue;
private boolean active = true;
- private SerialEvaluationState(ExecutorService executor) {
+ private SerialTransformExecutor(ExecutorService executor) {
this.executor = executor;
this.currentlyEvaluating = new AtomicReference<>();
this.workQueue = new ConcurrentLinkedQueue<>();
@@ -149,7 +170,7 @@ final class TransformExecutorServices {
@Override
public String toString() {
- return MoreObjects.toStringHelper(SerialEvaluationState.class)
+ return MoreObjects.toStringHelper(SerialTransformExecutor.class)
.add("currentlyEvaluating", currentlyEvaluating)
.add("workQueue", workQueue)
.toString();
http://git-wip-us.apache.org/repos/asf/beam/blob/db81205a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
index b085723..77652b2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
@@ -64,6 +64,31 @@ public class TransformExecutorServicesTest {
}
@Test
+ public void parallelRejectedStillActiveThrows() {
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> first = mock(TransformExecutor.class);
+
+ TransformExecutorService parallel =
+ TransformExecutorServices.parallel(executorService);
+ executorService.shutdown();
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("still active");
+ parallel.schedule(first);
+ }
+
+ @Test
+ public void parallelRejectedShutdownSucceeds() {
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> first = mock(TransformExecutor.class);
+
+ TransformExecutorService parallel =
+ TransformExecutorServices.parallel(executorService);
+ executorService.shutdown();
+ parallel.shutdown();
+ parallel.schedule(first);
+ }
+
+ @Test
public void serialScheduleTwoWaitsForFirstToComplete() {
@SuppressWarnings("unchecked")
TransformExecutor<Object> first = mock(TransformExecutor.class);
@@ -97,4 +122,27 @@ public class TransformExecutorServicesTest {
serial.complete(second);
}
+
+ /**
+ * Tests that a Serial {@link TransformExecutorService} does not schedule follow up work if the
+ * executor is shut down when the initial work completes.
+ */
+ @Test
+ public void serialShutdownCompleteActive() {
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> first = mock(TransformExecutor.class);
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+ TransformExecutorService serial = TransformExecutorServices.serial(executorService);
+ serial.schedule(first);
+ verify(first).run();
+
+ serial.schedule(second);
+ verify(second, never()).run();
+
+ serial.shutdown();
+ serial.complete(first);
+ verify(second, never()).run();
+ }
}