You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/22 20:05:37 UTC
[1/2] beam git commit: Support waitUntilFinish,
cancel in the DirectRunner
Repository: beam
Updated Branches:
refs/heads/master 2d9bf2747 -> 752a4c9d9
Support waitUntilFinish, cancel in the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caacf297
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caacf297
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caacf297
Branch: refs/heads/master
Commit: caacf297023d0d6e4a6bfe2f7ccd6edb73914d89
Parents: 2d9bf27
Author: Thomas Groh <tg...@google.com>
Authored: Mon Oct 17 11:21:02 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 09:43:47 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 43 +++---
.../direct/ExecutorServiceParallelExecutor.java | 137 ++++++++++++++-----
.../beam/runners/direct/PipelineExecutor.java | 23 +++-
.../direct/TransformExecutorService.java | 6 +
.../direct/TransformExecutorServices.java | 22 ++-
.../beam/runners/direct/DirectRunnerTest.java | 55 ++++++++
.../src/main/resources/beam/findbugs-filter.xml | 34 +++++
7 files changed, 264 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 06aa3b1..4992c6a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,7 +22,6 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -420,14 +419,34 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
* {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
* this method will never return.
*
- * <p>See also {@link PipelineExecutor#awaitCompletion()}.
+ * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}.
*/
@Override
public State waitUntilFinish() {
- if (!state.isTerminal()) {
+ return waitUntilFinish(Duration.ZERO);
+ }
+
+ @Override
+ public State cancel() {
+ this.state = executor.getPipelineState();
+ if (!this.state.isTerminal()) {
+ executor.stop();
+ this.state = executor.getPipelineState();
+ }
+ return executor.getPipelineState();
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ State startState = this.state;
+ if (!startState.isTerminal()) {
try {
- executor.awaitCompletion();
- state = State.DONE;
+ state = executor.waitUntilFinish(duration);
+ } catch (UserCodeException uce) {
+ // Emulates the behavior of Pipeline#run(), where a stack trace caused by a
+ // UserCodeException is truncated and replaced with the stack starting at the call to
+ // waitToFinish
+ throw new Pipeline.PipelineExecutionException(uce.getCause());
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -438,19 +457,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
throw new RuntimeException(e);
}
}
- return state;
- }
-
- @Override
- public State cancel() throws IOException {
- throw new UnsupportedOperationException("DirectPipelineResult does not support cancel.");
- }
-
- @Override
- public State waitUntilFinish(Duration duration) {
- throw new UnsupportedOperationException(
- "DirectPipelineResult does not support waitUntilFinish with a Duration parameter. See"
- + " BEAM-596.");
+ return this.state;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 935104a..8b9f995 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -25,6 +25,8 @@ import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
@@ -48,6 +50,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
@@ -55,6 +58,8 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,6 +104,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
* {@link CompletionCallback} decrement this value.
*/
private final AtomicLong outstandingWork = new AtomicLong();
+ private AtomicReference<State> pipelineState = new AtomicReference<>(State.RUNNING);
public static ExecutorServiceParallelExecutor create(
int targetParallelism,
@@ -138,7 +144,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
// Executing TransformExecutorServices have a strong reference to their TransformExecutorService
// which stops the TransformExecutorServices from being prematurely garbage collected
executorServices =
- CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
+ CacheBuilder.newBuilder()
+ .weakValues()
+ .removalListener(shutdownExecutorServiceListener())
+ .build(serialTransformExecutorServiceCacheLoader());
this.allUpdates = new ConcurrentLinkedQueue<>();
this.visibleUpdates = new LinkedBlockingQueue<>();
@@ -159,6 +168,19 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
};
}
+ private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorServiceListener() {
+ return new RemovalListener<StepAndKey, TransformExecutorService>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<StepAndKey, TransformExecutorService> notification) {
+ TransformExecutorService service = notification.getValue();
+ if (service != null) {
+ service.shutdown();
+ }
+ }
+ };
+ }
+
@Override
public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
int numTargetSplits = Math.max(3, targetParallelism);
@@ -179,7 +201,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
}
@SuppressWarnings("unchecked")
- public void scheduleConsumption(
+ private void scheduleConsumption(
AppliedPTransform<?, ?, ?> consumer,
CommittedBundle<?> bundle,
CompletionCallback onComplete) {
@@ -219,7 +241,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
onComplete,
transformExecutor);
outstandingWork.incrementAndGet();
- transformExecutor.schedule(callable);
+ if (!pipelineState.get().isTerminal()) {
+ transformExecutor.schedule(callable);
+ }
}
private boolean isKeyed(PValue pvalue) {
@@ -234,20 +258,66 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
}
@Override
- public void awaitCompletion() throws Exception {
- VisibleExecutorUpdate update;
- do {
- // Get an update; don't block forever if another thread has handled it
- update = visibleUpdates.poll(2L, TimeUnit.SECONDS);
- if (update == null && executorService.isShutdown()) {
+ public State waitUntilFinish(Duration duration) throws Exception {
+ Instant completionTime;
+ if (duration.equals(Duration.ZERO)) {
+ completionTime = new Instant(Long.MAX_VALUE);
+ } else {
+ completionTime = Instant.now().plus(duration);
+ }
+
+ VisibleExecutorUpdate update = null;
+ while (Instant.now().isBefore(completionTime)
+ && (update == null || isTerminalStateUpdate(update))) {
+ // Get an update; don't block forever if another thread has handled it. The call to poll will
+ // wait the entire timeout; this call primarily exists to relinquish any core.
+ update = visibleUpdates.poll(25L, TimeUnit.MILLISECONDS);
+ if (update == null && pipelineState.get().isTerminal()) {
// there are no updates to process and no updates will ever be published because the
// executor is shutdown
- return;
+ return pipelineState.get();
} else if (update != null && update.exception.isPresent()) {
throw update.exception.get();
}
- } while (update == null || !update.isDone());
+ }
+ return pipelineState.get();
+ }
+
+ @Override
+ public State getPipelineState() {
+ return pipelineState.get();
+ }
+
+ private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) {
+ return !(update.getNewState() == null && update.getNewState().isTerminal());
+ }
+
+ @Override
+ public void stop() {
+ shutdownIfNecessary(State.CANCELLED);
+ while (!visibleUpdates.offer(VisibleExecutorUpdate.cancelled())) {
+ // Make sure "This Pipeline was Cancelled" notification arrives.
+ visibleUpdates.poll();
+ }
+ }
+
+ private void shutdownIfNecessary(State newState) {
+ if (!newState.isTerminal()) {
+ return;
+ }
+ LOG.debug("Pipeline has terminated. Shutting down.");
+ pipelineState.compareAndSet(State.RUNNING, newState);
+ // Stop accepting new work before shutting down the executor. This ensures that thread don't try
+ // to add work to the shutdown executor.
+ executorServices.invalidateAll();
+ executorServices.cleanUp();
+ parallelExecutorService.shutdown();
executorService.shutdown();
+ try {
+ registry.cleanup();
+ } catch (Exception e) {
+ visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
+ }
}
/**
@@ -341,29 +411,35 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
}
/**
- * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to
+ * An update of interest to the user. Used in {@link #waitUntilFinish} to decide whether to
* return normally or throw an exception.
*/
private static class VisibleExecutorUpdate {
private final Optional<? extends Exception> exception;
- private final boolean done;
+ @Nullable
+ private final State newState;
public static VisibleExecutorUpdate fromException(Exception e) {
- return new VisibleExecutorUpdate(false, e);
+ return new VisibleExecutorUpdate(null, e);
}
public static VisibleExecutorUpdate finished() {
- return new VisibleExecutorUpdate(true, null);
+ return new VisibleExecutorUpdate(State.DONE, null);
+ }
+
+ public static VisibleExecutorUpdate cancelled() {
+ return new VisibleExecutorUpdate(State.CANCELLED, null);
}
- private VisibleExecutorUpdate(boolean done, @Nullable Exception exception) {
+ private VisibleExecutorUpdate(State newState, @Nullable Exception exception) {
this.exception = Optional.fromNullable(exception);
- this.done = done;
+ this.newState = newState;
}
- public boolean isDone() {
- return done;
+ public State getNewState() {
+ return newState;
}
+
}
private class MonitorRunnable implements Runnable {
@@ -475,22 +551,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
}
private boolean shouldShutdown() {
- boolean shouldShutdown = exceptionThrown || evaluationContext.isDone();
- if (shouldShutdown) {
- LOG.debug("Pipeline has terminated. Shutting down.");
- executorService.shutdown();
- try {
- registry.cleanup();
- } catch (Exception e) {
- visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
- }
- if (evaluationContext.isDone()) {
- while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
- visibleUpdates.poll();
- }
- }
+ State nextState = State.UNKNOWN;
+ if (exceptionThrown) {
+ nextState = State.FAILED;
+ } else if (evaluationContext.isDone()) {
+ visibleUpdates.offer(VisibleExecutorUpdate.finished());
+ nextState = State.DONE;
}
- return shouldShutdown;
+ shutdownIfNecessary(nextState);
+ return pipelineState.get().isTerminal();
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
index f900a22..82f59a7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -19,8 +19,11 @@ package org.apache.beam.runners.direct;
import java.util.Collection;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.joda.time.Duration;
/**
* An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
@@ -40,8 +43,24 @@ interface PipelineExecutor {
* root {@link AppliedPTransform AppliedPTransforms} have completed, and all
* {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
*
- * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
+ * <p>Waits for up to the provided duration, or forever if the provided duration is less than or
+ * equal to zero.
+ *
+ * @return The terminal state of the Pipeline.
+ * @throws Exception whenever an executor thread throws anything, transfers to the
* waiting thread and rethrows it
*/
- void awaitCompletion() throws Exception;
+ State waitUntilFinish(Duration duration) throws Exception;
+
+ /**
+ * Gets the current state of the {@link Pipeline}.
+ */
+ State getPipelineState();
+
+ /**
+ * Shuts down the executor.
+ *
+ * <p>The executor may continue to run for a short time after this method returns.
+ */
+ void stop();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
index 837b858..c6f770f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
@@ -32,4 +32,10 @@ interface TransformExecutorService {
* {@link TransformExecutor TransformExecutors} to be evaluated.
*/
void complete(TransformExecutor<?> completed);
+
+ /**
+ * Cancel any outstanding work, if possible. Any future calls to schedule should ignore any
+ * work.
+ */
+ void shutdown();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 876da9d..6733758 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -56,6 +57,7 @@ final class TransformExecutorServices {
*/
private static class ParallelEvaluationState implements TransformExecutorService {
private final ExecutorService executor;
+ private final AtomicBoolean active = new AtomicBoolean(true);
private ParallelEvaluationState(ExecutorService executor) {
this.executor = executor;
@@ -63,12 +65,19 @@ final class TransformExecutorServices {
@Override
public void schedule(TransformExecutor<?> work) {
- executor.submit(work);
+ if (active.get()) {
+ executor.submit(work);
+ }
}
@Override
public void complete(TransformExecutor<?> completed) {
}
+
+ @Override
+ public void shutdown() {
+ active.set(false);
+ }
}
/**
@@ -84,6 +93,7 @@ final class TransformExecutorServices {
private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
private final Queue<TransformExecutor<?>> workQueue;
+ private boolean active = true;
private SerialEvaluationState(ExecutorService executor) {
this.executor = executor;
@@ -113,12 +123,20 @@ final class TransformExecutorServices {
updateCurrentlyEvaluating();
}
+ @Override
+ public void shutdown() {
+ synchronized (this) {
+ active = false;
+ }
+ workQueue.clear();
+ }
+
private void updateCurrentlyEvaluating() {
if (currentlyEvaluating.get() == null) {
// Only synchronize if we need to update what's currently evaluating
synchronized (this) {
TransformExecutor<?> newWork = workQueue.poll();
- if (newWork != null) {
+ if (active && newWork != null) {
if (currentlyEvaluating.compareAndSet(null, newWork)) {
executor.submit(newWork);
} else {
http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index d2b6d1d..e601fcf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -32,9 +32,16 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -49,6 +56,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -65,6 +73,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matchers;
+import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -222,6 +231,52 @@ public class DirectRunnerTest implements Serializable {
}
@Test
+ public void cancelShouldStopPipeline() throws Exception {
+ PipelineOptions opts = TestPipeline.testingPipelineOptions();
+ opts.as(DirectOptions.class).setBlockOnRun(false);
+ opts.setRunner(DirectRunner.class);
+
+ final Pipeline p = Pipeline.create(opts);
+ p.apply(CountingInput.unbounded().withRate(1L, Duration.standardSeconds(1)));
+
+ final BlockingQueue<PipelineResult> resultExchange = new ArrayBlockingQueue<>(1);
+ Runnable cancelRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ resultExchange.take().cancel();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+
+ Callable<PipelineResult> runPipelineRunnable = new Callable<PipelineResult>() {
+ @Override
+ public PipelineResult call() {
+ PipelineResult res = p.run();
+ try {
+ resultExchange.put(res);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ return res;
+ }
+ };
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ executor.submit(cancelRunnable);
+ Future<PipelineResult> result = executor.submit(runPipelineRunnable);
+
+ // If cancel doesn't work, this will hang forever
+ result.get().waitUntilFinish();
+ }
+
+ @Test
public void transformDisplayDataExceptionShouldFail() {
DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 714fd00..2799b00 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -155,6 +155,40 @@
</Match>
<Match>
+ <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable" />
+ <Method name="shouldShutdown" />
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+ <!-- visibleUpdates is a non-capacity-limited LinkedBlockingQueue, which
+ can never refuse an offered update -->
+ </Match>
+
+ <Match>
+ <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
+ <Or>
+ <Field name="bcast" />
+ <Field name="value" />
+ </Or>
+ <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+ <!--
+ Spark's Broadcast variables are a distributed and cached objects
+ and should not be treated as "normal" objects.
+ -->
+ </Match>
+
+ <Match>
+ <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/>
+ <Or>
+ <Field name="bcast" />
+ <Field name="value" />
+ </Or>
+ <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+ <!--
+ Spark's Broadcast variables are a distributed and cached objects
+ and should not be treated as "normal" objects.
+ -->
+ </Match>
+
+ <Match>
<Class name="org.apache.beam.runners.spark.metrics.sink.CsvSink"/>
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
<!-- Intentionally overriding parent name because inheritors should replace the parent. -->
[2/2] beam git commit: This closes #1115
Posted by tg...@apache.org.
This closes #1115
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/752a4c9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/752a4c9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/752a4c9d
Branch: refs/heads/master
Commit: 752a4c9d92b00ab70a7c99f476a5bba7e4d03eaf
Parents: 2d9bf27 caacf29
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 22 13:05:26 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 13:05:26 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 43 +++---
.../direct/ExecutorServiceParallelExecutor.java | 137 ++++++++++++++-----
.../beam/runners/direct/PipelineExecutor.java | 23 +++-
.../direct/TransformExecutorService.java | 6 +
.../direct/TransformExecutorServices.java | 22 ++-
.../beam/runners/direct/DirectRunnerTest.java | 55 ++++++++
.../src/main/resources/beam/findbugs-filter.xml | 34 +++++
7 files changed, 264 insertions(+), 56 deletions(-)
----------------------------------------------------------------------