You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/22 20:05:37 UTC

[1/2] beam git commit: Support waitUntilFinish, cancel in the DirectRunner

Repository: beam
Updated Branches:
  refs/heads/master 2d9bf2747 -> 752a4c9d9


Support waitUntilFinish, cancel in the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caacf297
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caacf297
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caacf297

Branch: refs/heads/master
Commit: caacf297023d0d6e4a6bfe2f7ccd6edb73914d89
Parents: 2d9bf27
Author: Thomas Groh <tg...@google.com>
Authored: Mon Oct 17 11:21:02 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 09:43:47 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |  43 +++---
 .../direct/ExecutorServiceParallelExecutor.java | 137 ++++++++++++++-----
 .../beam/runners/direct/PipelineExecutor.java   |  23 +++-
 .../direct/TransformExecutorService.java        |   6 +
 .../direct/TransformExecutorServices.java       |  22 ++-
 .../beam/runners/direct/DirectRunnerTest.java   |  55 ++++++++
 .../src/main/resources/beam/findbugs-filter.xml |  34 +++++
 7 files changed, 264 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 06aa3b1..4992c6a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,7 +22,6 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -420,14 +419,34 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
      * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
      * this method will never return.
      *
-     * <p>See also {@link PipelineExecutor#awaitCompletion()}.
+     * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}.
      */
     @Override
     public State waitUntilFinish() {
-      if (!state.isTerminal()) {
+      return waitUntilFinish(Duration.ZERO);
+    }
+
+    @Override
+    public State cancel() {
+      this.state = executor.getPipelineState();
+      if (!this.state.isTerminal()) {
+        executor.stop();
+        this.state = executor.getPipelineState();
+      }
+      return executor.getPipelineState();
+    }
+
+    @Override
+    public State waitUntilFinish(Duration duration) {
+      State startState = this.state;
+      if (!startState.isTerminal()) {
         try {
-          executor.awaitCompletion();
-          state = State.DONE;
+          state = executor.waitUntilFinish(duration);
+        } catch (UserCodeException uce) {
+          // Emulates the behavior of Pipeline#run(), where a stack trace caused by a
+          // UserCodeException is truncated and replaced with the stack starting at the call to
+          // waitToFinish
+          throw new Pipeline.PipelineExecutionException(uce.getCause());
         } catch (Exception e) {
           if (e instanceof InterruptedException) {
             Thread.currentThread().interrupt();
@@ -438,19 +457,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
           throw new RuntimeException(e);
         }
       }
-      return state;
-    }
-
-    @Override
-    public State cancel() throws IOException {
-      throw new UnsupportedOperationException("DirectPipelineResult does not support cancel.");
-    }
-
-    @Override
-    public State waitUntilFinish(Duration duration) {
-      throw new UnsupportedOperationException(
-          "DirectPipelineResult does not support waitUntilFinish with a Duration parameter. See"
-              + " BEAM-596.");
+      return this.state;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 935104a..8b9f995 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -25,6 +25,8 @@ import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,6 +50,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -55,6 +58,8 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,6 +104,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
    * {@link CompletionCallback} decrement this value.
    */
   private final AtomicLong outstandingWork = new AtomicLong();
+  private AtomicReference<State> pipelineState = new AtomicReference<>(State.RUNNING);
 
   public static ExecutorServiceParallelExecutor create(
       int targetParallelism,
@@ -138,7 +144,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
     // Executing TransformExecutorServices have a strong reference to their TransformExecutorService
     // which stops the TransformExecutorServices from being prematurely garbage collected
     executorServices =
-        CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
+        CacheBuilder.newBuilder()
+            .weakValues()
+            .removalListener(shutdownExecutorServiceListener())
+            .build(serialTransformExecutorServiceCacheLoader());
 
     this.allUpdates = new ConcurrentLinkedQueue<>();
     this.visibleUpdates = new LinkedBlockingQueue<>();
@@ -159,6 +168,19 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
     };
   }
 
+  private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorServiceListener() {
+    return new RemovalListener<StepAndKey, TransformExecutorService>() {
+      @Override
+      public void onRemoval(
+          RemovalNotification<StepAndKey, TransformExecutorService> notification) {
+        TransformExecutorService service = notification.getValue();
+        if (service != null) {
+          service.shutdown();
+        }
+      }
+    };
+  }
+
   @Override
   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
     int numTargetSplits = Math.max(3, targetParallelism);
@@ -179,7 +201,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   }
 
   @SuppressWarnings("unchecked")
-  public void scheduleConsumption(
+  private void scheduleConsumption(
       AppliedPTransform<?, ?, ?> consumer,
       CommittedBundle<?> bundle,
       CompletionCallback onComplete) {
@@ -219,7 +241,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
             onComplete,
             transformExecutor);
     outstandingWork.incrementAndGet();
-    transformExecutor.schedule(callable);
+    if (!pipelineState.get().isTerminal()) {
+      transformExecutor.schedule(callable);
+    }
   }
 
   private boolean isKeyed(PValue pvalue) {
@@ -234,20 +258,66 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   }
 
   @Override
-  public void awaitCompletion() throws Exception {
-    VisibleExecutorUpdate update;
-    do {
-      // Get an update; don't block forever if another thread has handled it
-      update = visibleUpdates.poll(2L, TimeUnit.SECONDS);
-      if (update == null && executorService.isShutdown()) {
+  public State waitUntilFinish(Duration duration) throws Exception {
+    Instant completionTime;
+    if (duration.equals(Duration.ZERO)) {
+      completionTime = new Instant(Long.MAX_VALUE);
+    } else {
+      completionTime = Instant.now().plus(duration);
+    }
+
+    VisibleExecutorUpdate update = null;
+    while (Instant.now().isBefore(completionTime)
+        && (update == null || isTerminalStateUpdate(update))) {
+      // Get an update; don't block forever if another thread has handled it. The call to poll will
+      // wait the entire timeout; this call primarily exists to relinquish any core.
+      update = visibleUpdates.poll(25L, TimeUnit.MILLISECONDS);
+      if (update == null && pipelineState.get().isTerminal()) {
         // there are no updates to process and no updates will ever be published because the
         // executor is shutdown
-        return;
+        return pipelineState.get();
       } else if (update != null && update.exception.isPresent()) {
         throw update.exception.get();
       }
-    } while (update == null || !update.isDone());
+    }
+    return pipelineState.get();
+  }
+
+  @Override
+  public State getPipelineState() {
+    return pipelineState.get();
+  }
+
+  private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) {
+    return !(update.getNewState() == null && update.getNewState().isTerminal());
+  }
+
+  @Override
+  public void stop() {
+    shutdownIfNecessary(State.CANCELLED);
+    while (!visibleUpdates.offer(VisibleExecutorUpdate.cancelled())) {
+      // Make sure "This Pipeline was Cancelled" notification arrives.
+      visibleUpdates.poll();
+    }
+  }
+
+  private void shutdownIfNecessary(State newState) {
+    if (!newState.isTerminal()) {
+      return;
+    }
+    LOG.debug("Pipeline has terminated. Shutting down.");
+    pipelineState.compareAndSet(State.RUNNING, newState);
+    // Stop accepting new work before shutting down the executor. This ensures that thread don't try
+    // to add work to the shutdown executor.
+    executorServices.invalidateAll();
+    executorServices.cleanUp();
+    parallelExecutorService.shutdown();
     executorService.shutdown();
+    try {
+      registry.cleanup();
+    } catch (Exception e) {
+      visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
+    }
   }
 
   /**
@@ -341,29 +411,35 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   }
 
   /**
-   * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to
+   * An update of interest to the user. Used in {@link #waitUntilFinish} to decide whether to
    * return normally or throw an exception.
    */
   private static class VisibleExecutorUpdate {
     private final Optional<? extends Exception> exception;
-    private final boolean done;
+    @Nullable
+    private final State newState;
 
     public static VisibleExecutorUpdate fromException(Exception e) {
-      return new VisibleExecutorUpdate(false, e);
+      return new VisibleExecutorUpdate(null, e);
     }
 
     public static VisibleExecutorUpdate finished() {
-      return new VisibleExecutorUpdate(true, null);
+      return new VisibleExecutorUpdate(State.DONE, null);
+    }
+
+    public static VisibleExecutorUpdate cancelled() {
+      return new VisibleExecutorUpdate(State.CANCELLED, null);
     }
 
-    private VisibleExecutorUpdate(boolean done, @Nullable Exception exception) {
+    private VisibleExecutorUpdate(State newState, @Nullable Exception exception) {
       this.exception = Optional.fromNullable(exception);
-      this.done = done;
+      this.newState = newState;
     }
 
-    public boolean isDone() {
-      return done;
+    public State getNewState() {
+      return newState;
     }
+
   }
 
   private class MonitorRunnable implements Runnable {
@@ -475,22 +551,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
     }
 
     private boolean shouldShutdown() {
-      boolean shouldShutdown = exceptionThrown || evaluationContext.isDone();
-      if (shouldShutdown) {
-        LOG.debug("Pipeline has terminated. Shutting down.");
-        executorService.shutdown();
-        try {
-          registry.cleanup();
-        } catch (Exception e) {
-          visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
-        }
-        if (evaluationContext.isDone()) {
-          while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
-            visibleUpdates.poll();
-          }
-        }
+      State nextState = State.UNKNOWN;
+      if (exceptionThrown) {
+        nextState = State.FAILED;
+      } else if (evaluationContext.isDone()) {
+        visibleUpdates.offer(VisibleExecutorUpdate.finished());
+        nextState = State.DONE;
       }
-      return shouldShutdown;
+      shutdownIfNecessary(nextState);
+      return pipelineState.get().isTerminal();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
index f900a22..82f59a7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -19,8 +19,11 @@ package org.apache.beam.runners.direct;
 
 import java.util.Collection;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.joda.time.Duration;
 
 /**
  * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
@@ -40,8 +43,24 @@ interface PipelineExecutor {
    * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
    * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
    *
-   * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
+   * <p>Waits for up to the provided duration, or forever if the provided duration is less than or
+   * equal to zero.
+   *
+   * @return The terminal state of the Pipeline.
+   * @throws Exception whenever an executor thread throws anything, transfers to the
    *                   waiting thread and rethrows it
    */
-  void awaitCompletion() throws Exception;
+  State waitUntilFinish(Duration duration) throws Exception;
+
+  /**
+   * Gets the current state of the {@link Pipeline}.
+   */
+  State getPipelineState();
+
+  /**
+   * Shuts down the executor.
+   *
+   * <p>The executor may continue to run for a short time after this method returns.
+   */
+  void stop();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
index 837b858..c6f770f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
@@ -32,4 +32,10 @@ interface TransformExecutorService {
    * {@link TransformExecutor TransformExecutors} to be evaluated.
    */
   void complete(TransformExecutor<?> completed);
+
+  /**
+   * Cancel any outstanding work, if possible. Any future calls to schedule should ignore any
+   * work.
+   */
+  void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 876da9d..6733758 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -56,6 +57,7 @@ final class TransformExecutorServices {
    */
   private static class ParallelEvaluationState implements TransformExecutorService {
     private final ExecutorService executor;
+    private final AtomicBoolean active = new AtomicBoolean(true);
 
     private ParallelEvaluationState(ExecutorService executor) {
       this.executor = executor;
@@ -63,12 +65,19 @@ final class TransformExecutorServices {
 
     @Override
     public void schedule(TransformExecutor<?> work) {
-      executor.submit(work);
+      if (active.get()) {
+        executor.submit(work);
+      }
     }
 
     @Override
     public void complete(TransformExecutor<?> completed) {
     }
+
+    @Override
+    public void shutdown() {
+      active.set(false);
+    }
   }
 
   /**
@@ -84,6 +93,7 @@ final class TransformExecutorServices {
 
     private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
     private final Queue<TransformExecutor<?>> workQueue;
+    private boolean active = true;
 
     private SerialEvaluationState(ExecutorService executor) {
       this.executor = executor;
@@ -113,12 +123,20 @@ final class TransformExecutorServices {
       updateCurrentlyEvaluating();
     }
 
+    @Override
+    public void shutdown() {
+      synchronized (this) {
+        active = false;
+      }
+      workQueue.clear();
+    }
+
     private void updateCurrentlyEvaluating() {
       if (currentlyEvaluating.get() == null) {
         // Only synchronize if we need to update what's currently evaluating
         synchronized (this) {
           TransformExecutor<?> newWork = workQueue.poll();
-          if (newWork != null) {
+          if (active && newWork != null) {
             if (currentlyEvaluating.compareAndSet(null, newWork)) {
               executor.submit(newWork);
             } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index d2b6d1d..e601fcf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -32,9 +32,16 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -49,6 +56,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -65,6 +73,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -222,6 +231,52 @@ public class DirectRunnerTest implements Serializable {
   }
 
   @Test
+  public void cancelShouldStopPipeline() throws Exception {
+    PipelineOptions opts = TestPipeline.testingPipelineOptions();
+    opts.as(DirectOptions.class).setBlockOnRun(false);
+    opts.setRunner(DirectRunner.class);
+
+    final Pipeline p = Pipeline.create(opts);
+    p.apply(CountingInput.unbounded().withRate(1L, Duration.standardSeconds(1)));
+
+    final BlockingQueue<PipelineResult> resultExchange = new ArrayBlockingQueue<>(1);
+    Runnable cancelRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          resultExchange.take().cancel();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        } catch (IOException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+    };
+
+    Callable<PipelineResult> runPipelineRunnable = new Callable<PipelineResult>() {
+      @Override
+      public PipelineResult call() {
+        PipelineResult res = p.run();
+        try {
+          resultExchange.put(res);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+        return res;
+      }
+    };
+
+    ExecutorService executor = Executors.newCachedThreadPool();
+    executor.submit(cancelRunnable);
+    Future<PipelineResult> result = executor.submit(runPipelineRunnable);
+
+    // If cancel doesn't work, this will hang forever
+    result.get().waitUntilFinish();
+  }
+
+  @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
       @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/caacf297/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 714fd00..2799b00 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -155,6 +155,40 @@
   </Match>
 
   <Match>
+    <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable" />
+    <Method name="shouldShutdown" />
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+    <!-- visibleUpdates is a non-capacity-limited LinkedBlockingQueue, which
+      can never refuse an offered update -->
+  </Match>
+
+  <Match>
+    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
+    <Or>
+      <Field name="bcast" />
+      <Field name="value" />
+    </Or>
+    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+    <!--
+      Spark's Broadcast variables are a distributed and cached objects
+      and should not be treated as "normal" objects.
+    -->
+  </Match>
+
+  <Match>
+    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/>
+    <Or>
+      <Field name="bcast" />
+      <Field name="value" />
+    </Or>
+    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+    <!--
+      Spark's Broadcast variables are a distributed and cached objects
+      and should not be treated as "normal" objects.
+    -->
+  </Match>
+
+  <Match>
     <Class name="org.apache.beam.runners.spark.metrics.sink.CsvSink"/>
     <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
     <!-- Intentionally overriding parent name because inheritors should replace the parent. -->


[2/2] beam git commit: This closes #1115

Posted by tg...@apache.org.
This closes #1115


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/752a4c9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/752a4c9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/752a4c9d

Branch: refs/heads/master
Commit: 752a4c9d92b00ab70a7c99f476a5bba7e4d03eaf
Parents: 2d9bf27 caacf29
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 22 13:05:26 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 13:05:26 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |  43 +++---
 .../direct/ExecutorServiceParallelExecutor.java | 137 ++++++++++++++-----
 .../beam/runners/direct/PipelineExecutor.java   |  23 +++-
 .../direct/TransformExecutorService.java        |   6 +
 .../direct/TransformExecutorServices.java       |  22 ++-
 .../beam/runners/direct/DirectRunnerTest.java   |  55 ++++++++
 .../src/main/resources/beam/findbugs-filter.xml |  34 +++++
 7 files changed, 264 insertions(+), 56 deletions(-)
----------------------------------------------------------------------