You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/03/07 06:31:04 UTC

[beam] 01/02: Revert "extracting the scheduled executor service in a factory variable in SDF direct runner factory"

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch release-2.4.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1b2ba1e2b1a183010ee1ce4d317592ea0386b41b
Author: Thomas Groh <tg...@google.com>
AuthorDate: Fri Mar 2 09:27:48 2018 -0800

    Revert "extracting the scheduled executor service in a factory variable in SDF direct runner factory"
    
    This reverts commit 0044cbf385a4991a7d5191a91b881d8525d747c0.
    
    Breaks FlinkRunner Compilation
---
 .../org/apache/beam/runners/core/DoFnRunners.java  |   2 +-
 .../apache/beam/runners/core/ProcessFnRunner.java  |  10 +-
 .../apache/beam/runners/core/SimpleDoFnRunner.java |   4 -
 ...LifecycleManagerRemovingTransformEvaluator.java |   4 -
 .../direct/ExecutorServiceParallelExecutor.java    |  40 ++------
 .../apache/beam/runners/direct/ParDoEvaluator.java |   8 --
 .../beam/runners/direct/ParDoEvaluatorFactory.java |   2 +-
 .../SplittableProcessElementsEvaluatorFactory.java | 109 +++++++++++----------
 .../beam/runners/direct/DirectRunnerTest.java      |  35 -------
 9 files changed, 67 insertions(+), 147 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 41116f1..80c830a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -52,7 +52,7 @@ public class DoFnRunners {
    * compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key
    * partitioning needed, etc.
    */
-  public static <InputT, OutputT> SimpleDoFnRunner<InputT, OutputT> simpleRunner(
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
       PipelineOptions options,
       DoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 6690f58..e4dfd13 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -40,13 +39,12 @@ import org.joda.time.Instant;
 public class ProcessFnRunner<InputT, OutputT, RestrictionT>
     implements PushbackSideInputDoFnRunner<
         KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
-  private final SimpleDoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
-    underlying;
+  private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying;
   private final Collection<PCollectionView<?>> views;
   private final ReadyCheckingSideInputReader sideInputReader;
 
   public ProcessFnRunner(
-      SimpleDoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying,
+      DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying,
       Collection<PCollectionView<?>> views,
       ReadyCheckingSideInputReader sideInputReader) {
     this.underlying = underlying;
@@ -54,10 +52,6 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
     this.sideInputReader = sideInputReader;
   }
 
-  public DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> getFn() {
-    return underlying.getFn();
-  }
-
   @Override
   public void startBundle() {
     underlying.startBundle();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 36b42ef..d4c5775 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -120,10 +120,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     this.allowedLateness = windowingStrategy.getAllowedLateness();
   }
 
-  public DoFn<InputT, OutputT> getFn() {
-    return fn;
-  }
-
   @Override
   public void startBundle() {
     // This can contain user code. Wrap it in case it throws an exception.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index eed81b3..e537962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -45,10 +45,6 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor
     this.lifecycleManager = lifecycleManager;
   }
 
-  public ParDoEvaluator<InputT> getParDoEvaluator() {
-    return underlying;
-  }
-
   @Override
   public void processElement(WindowedValue<InputT> element) throws Exception {
     try {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 4a1afc6..652f388 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -25,7 +25,6 @@ import com.google.common.cache.RemovalListener;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -35,7 +34,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.local.ExecutionDriver;
 import org.apache.beam.runners.local.ExecutionDriver.DriverState;
@@ -274,41 +272,17 @@ final class ExecutorServiceParallelExecutor
       return;
     }
     LOG.debug("Pipeline has terminated. Shutting down.");
-
-    final Collection<Exception> errors = new ArrayList<>();
+    pipelineState.compareAndSet(State.RUNNING, newState);
     // Stop accepting new work before shutting down the executor. This ensures that thread don't try
     // to add work to the shutdown executor.
-    try {
-      serialExecutorServices.invalidateAll();
-    } catch (final RuntimeException re) {
-      errors.add(re);
-    }
-    try {
-      serialExecutorServices.cleanUp();
-    } catch (final RuntimeException re) {
-      errors.add(re);
-    }
-    try {
-      parallelExecutorService.shutdown();
-    } catch (final RuntimeException re) {
-      errors.add(re);
-    }
-    try {
-      executorService.shutdown();
-    } catch (final RuntimeException re) {
-      errors.add(re);
-    }
+    serialExecutorServices.invalidateAll();
+    serialExecutorServices.cleanUp();
+    parallelExecutorService.shutdown();
+    executorService.shutdown();
     try {
       registry.cleanup();
-    } catch (final Exception e) {
-      errors.add(e);
-    }
-    pipelineState.compareAndSet(State.RUNNING, newState); // ensure we hit a terminal node
-    if (!errors.isEmpty()) {
-      throw new IllegalStateException(
-        "Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n"
-        + errors.stream().map(Exception::getMessage)
-          .collect(Collectors.joining("\n- ", "- ", "")));
+    } catch (Exception e) {
+      visibleUpdates.failed(e);
     }
   }
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index c2b877f..7694b94 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -169,14 +169,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
     }
   }
 
-  public PushbackSideInputDoFnRunner<InputT, ?> getFnRunner() {
-    return fnRunner;
-  }
-
-  public DirectStepContext getStepContext() {
-    return stepContext;
-  }
-
   public BundleOutputManager getOutputManager() {
     return outputManager;
   }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 2963118..5774f17 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -42,7 +42,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
 
   private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
   private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
-  final EvaluationContext evaluationContext;
+  private final EvaluationContext evaluationContext;
   private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;
 
   ParDoEvaluatorFactory(
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index cd38c8c..f4c4895 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -17,22 +17,18 @@
  */
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.cache.CacheLoader;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
-import org.apache.beam.runners.core.ProcessFnRunner;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -54,30 +50,16 @@ class SplittableProcessElementsEvaluatorFactory<
     implements TransformEvaluatorFactory {
   private final ParDoEvaluatorFactory<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
       delegateFactory;
-  private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(
-            new ThreadFactoryBuilder()
-                    .setThreadFactory(MoreExecutors.platformThreadFactory())
-                    .setNameFormat("direct-splittable-process-element-checkpoint-executor")
-                    .build());
+  private final EvaluationContext evaluationContext;
 
   SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
     this.delegateFactory =
-      new ParDoEvaluatorFactory<>(
-        evaluationContext,
-        SplittableProcessElementsEvaluatorFactory.
-          <InputT, OutputT, RestrictionT>processFnRunnerFactory(),
-          new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
-            @Override
-            public DoFnLifecycleManager load(final AppliedPTransform<?, ?, ?> application) {
-              checkArgument(
-                ProcessElements.class.isInstance(application.getTransform()),
-                "No know extraction of the fn from " + application);
-              final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
-                (ProcessElements<InputT, OutputT, RestrictionT, TrackerT>)
-                  application.getTransform();
-              return DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn()));
-            }
-          });
+        new ParDoEvaluatorFactory<>(
+            evaluationContext,
+            SplittableProcessElementsEvaluatorFactory
+                .<InputT, OutputT, RestrictionT>processFnRunnerFactory(),
+            ParDoEvaluatorFactory.basicDoFnCacheLoader());
   }
 
   @Override
@@ -86,14 +68,12 @@ class SplittableProcessElementsEvaluatorFactory<
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =
         (TransformEvaluator<T>)
-            createEvaluator((AppliedPTransform) application,
-                    (CommittedBundle) inputBundle);
+            createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle);
     return evaluator;
   }
 
   @Override
   public void cleanup() throws Exception {
-    ses.shutdownNow(); // stop before cleaning
     delegateFactory.cleanup();
   }
 
@@ -106,29 +86,43 @@ class SplittableProcessElementsEvaluatorFactory<
       CommittedBundle<InputT> inputBundle)
       throws Exception {
     final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
-      application.getTransform();
-    final DoFnLifecycleManagerRemovingTransformEvaluator
-      <KeyedWorkItem<String, KV<InputT, RestrictionT>>> evaluator =
-      delegateFactory.createEvaluator(
-        (AppliedPTransform) application,
-        (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>) inputBundle.getPCollection(),
-        inputBundle.getKey(),
-        application.getTransform().getSideInputs(),
-        application.getTransform().getMainOutputTag(),
-        application.getTransform().getAdditionalOutputTags().getAll());
-
-    final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> pde =
-      evaluator.getParDoEvaluator();
-    final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
-      (ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
-        ProcessFnRunner.class.cast(pde.getFnRunner()).getFn();
-    final DirectExecutionContext.DirectStepContext stepContext = pde.getStepContext();
-    processFn.setStateInternalsFactory(key -> stepContext.stateInternals());
+        application.getTransform();
+
+    ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+        transform.newProcessFn(transform.getFn());
+
+    DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
+    processFn =
+        ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+            fnManager.<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>get());
+
+    String stepName = evaluationContext.getStepName(application);
+    final DirectExecutionContext.DirectStepContext stepContext =
+        evaluationContext
+            .getExecutionContext(application, inputBundle.getKey())
+            .getStepContext(stepName);
+
+    final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
+        parDoEvaluator =
+            delegateFactory.createParDoEvaluator(
+                application,
+                inputBundle.getKey(),
+                (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>)
+                    inputBundle.getPCollection(),
+                transform.getSideInputs(),
+                transform.getMainOutputTag(),
+                transform.getAdditionalOutputTags().getAll(),
+                stepContext,
+                processFn,
+                fnManager);
+
+    processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals());
+
     processFn.setTimerInternalsFactory(key -> stepContext.timerInternals());
 
     OutputWindowedValue<OutputT> outputWindowedValue =
         new OutputWindowedValue<OutputT>() {
-          private final OutputManager outputManager = pde.getOutputManager();
+          private final OutputManager outputManager = parDoEvaluator.getOutputManager();
 
           @Override
           public void outputWindowedValue(
@@ -150,18 +144,27 @@ class SplittableProcessElementsEvaluatorFactory<
             outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
           }
         };
-      processFn.setProcessElementInvoker(
+    processFn.setProcessElementInvoker(
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             transform.getFn(),
-            delegateFactory.evaluationContext.getPipelineOptions(),
+            evaluationContext.getPipelineOptions(),
             outputWindowedValue,
-            delegateFactory.evaluationContext.createSideInputReader(transform.getSideInputs()),
-            ses,
+            evaluationContext.createSideInputReader(transform.getSideInputs()),
+            // TODO: For better performance, use a higher-level executor?
+            // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the
+            // DirectRunner.
+            Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder()
+                    .setThreadFactory(MoreExecutors.platformThreadFactory())
+                    .setDaemon(true)
+                    .setNameFormat("direct-splittable-process-element-checkpoint-executor")
+                    .build()),
             // Setting small values here to stimulate frequent checkpointing and better exercise
             // splittable DoFn's in that respect.
             100,
             Duration.standardSeconds(1)));
-    return evaluator;
+
+    return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
   }
 
   private static <InputT, OutputT, RestrictionT>
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 1c8b144..830d0c1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
@@ -40,7 +39,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -301,39 +299,6 @@ public class DirectRunnerTest implements Serializable {
     assertThat(result.getState(), is(State.RUNNING));
   }
 
-  private static final AtomicLong TEARDOWN_CALL = new AtomicLong(-1);
-
-  @Test
-  public void tearsDownFnsBeforeFinishing() {
-    TEARDOWN_CALL.set(-1);
-    final Pipeline pipeline = getPipeline();
-    pipeline.apply(Create.of("a"))
-      .apply(ParDo.of(new DoFn<String, String>() {
-        @ProcessElement
-        public void onElement(final ProcessContext ctx) {
-          // no-op
-        }
-
-        @Teardown
-        public void teardown() {
-          // just to not have a fast execution hiding an issue until we have a shutdown callback
-          try {
-              Thread.sleep(1000);
-          } catch (final InterruptedException e) {
-              fail();
-          }
-          TEARDOWN_CALL.set(System.nanoTime());
-        }
-      }));
-    final PipelineResult pipelineResult = pipeline.run();
-    pipelineResult.waitUntilFinish();
-
-    final long doneTs = System.nanoTime();
-    final long tearDownTs = TEARDOWN_CALL.get();
-    assertThat(tearDownTs, greaterThan(0L));
-    assertThat(doneTs, greaterThan(tearDownTs));
-  }
-
   @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {

-- 
To stop receiving notification emails like this one, please contact
robertwb@apache.org.