You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/03/07 06:31:04 UTC
[beam] 01/02: Revert "extracting the scheduled executor service in
a factory variable in SDF direct runner factory"
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch release-2.4.0
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1b2ba1e2b1a183010ee1ce4d317592ea0386b41b
Author: Thomas Groh <tg...@google.com>
AuthorDate: Fri Mar 2 09:27:48 2018 -0800
Revert "extracting the scheduled executor service in a factory variable in SDF direct runner factory"
This reverts commit 0044cbf385a4991a7d5191a91b881d8525d747c0.
Breaks FlinkRunner Compilation
---
.../org/apache/beam/runners/core/DoFnRunners.java | 2 +-
.../apache/beam/runners/core/ProcessFnRunner.java | 10 +-
.../apache/beam/runners/core/SimpleDoFnRunner.java | 4 -
...LifecycleManagerRemovingTransformEvaluator.java | 4 -
.../direct/ExecutorServiceParallelExecutor.java | 40 ++------
.../apache/beam/runners/direct/ParDoEvaluator.java | 8 --
.../beam/runners/direct/ParDoEvaluatorFactory.java | 2 +-
.../SplittableProcessElementsEvaluatorFactory.java | 109 +++++++++++----------
.../beam/runners/direct/DirectRunnerTest.java | 35 -------
9 files changed, 67 insertions(+), 147 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 41116f1..80c830a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -52,7 +52,7 @@ public class DoFnRunners {
* compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key
* partitioning needed, etc.
*/
- public static <InputT, OutputT> SimpleDoFnRunner<InputT, OutputT> simpleRunner(
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 6690f58..e4dfd13 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -25,7 +25,6 @@ import java.util.Collections;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -40,13 +39,12 @@ import org.joda.time.Instant;
public class ProcessFnRunner<InputT, OutputT, RestrictionT>
implements PushbackSideInputDoFnRunner<
KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
- private final SimpleDoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
- underlying;
+ private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying;
private final Collection<PCollectionView<?>> views;
private final ReadyCheckingSideInputReader sideInputReader;
public ProcessFnRunner(
- SimpleDoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying,
+ DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader) {
this.underlying = underlying;
@@ -54,10 +52,6 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
this.sideInputReader = sideInputReader;
}
- public DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> getFn() {
- return underlying.getFn();
- }
-
@Override
public void startBundle() {
underlying.startBundle();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 36b42ef..d4c5775 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -120,10 +120,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
this.allowedLateness = windowingStrategy.getAllowedLateness();
}
- public DoFn<InputT, OutputT> getFn() {
- return fn;
- }
-
@Override
public void startBundle() {
// This can contain user code. Wrap it in case it throws an exception.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index eed81b3..e537962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -45,10 +45,6 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor
this.lifecycleManager = lifecycleManager;
}
- public ParDoEvaluator<InputT> getParDoEvaluator() {
- return underlying;
- }
-
@Override
public void processElement(WindowedValue<InputT> element) throws Exception {
try {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 4a1afc6..652f388 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -25,7 +25,6 @@ import com.google.common.cache.RemovalListener;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -35,7 +34,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.runners.local.ExecutionDriver;
import org.apache.beam.runners.local.ExecutionDriver.DriverState;
@@ -274,41 +272,17 @@ final class ExecutorServiceParallelExecutor
return;
}
LOG.debug("Pipeline has terminated. Shutting down.");
-
- final Collection<Exception> errors = new ArrayList<>();
+ pipelineState.compareAndSet(State.RUNNING, newState);
// Stop accepting new work before shutting down the executor. This ensures that thread don't try
// to add work to the shutdown executor.
- try {
- serialExecutorServices.invalidateAll();
- } catch (final RuntimeException re) {
- errors.add(re);
- }
- try {
- serialExecutorServices.cleanUp();
- } catch (final RuntimeException re) {
- errors.add(re);
- }
- try {
- parallelExecutorService.shutdown();
- } catch (final RuntimeException re) {
- errors.add(re);
- }
- try {
- executorService.shutdown();
- } catch (final RuntimeException re) {
- errors.add(re);
- }
+ serialExecutorServices.invalidateAll();
+ serialExecutorServices.cleanUp();
+ parallelExecutorService.shutdown();
+ executorService.shutdown();
try {
registry.cleanup();
- } catch (final Exception e) {
- errors.add(e);
- }
- pipelineState.compareAndSet(State.RUNNING, newState); // ensure we hit a terminal node
- if (!errors.isEmpty()) {
- throw new IllegalStateException(
- "Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n"
- + errors.stream().map(Exception::getMessage)
- .collect(Collectors.joining("\n- ", "- ", "")));
+ } catch (Exception e) {
+ visibleUpdates.failed(e);
}
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index c2b877f..7694b94 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -169,14 +169,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
}
}
- public PushbackSideInputDoFnRunner<InputT, ?> getFnRunner() {
- return fnRunner;
- }
-
- public DirectStepContext getStepContext() {
- return stepContext;
- }
-
public BundleOutputManager getOutputManager() {
return outputManager;
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 2963118..5774f17 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -42,7 +42,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
- final EvaluationContext evaluationContext;
+ private final EvaluationContext evaluationContext;
private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;
ParDoEvaluatorFactory(
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index cd38c8c..f4c4895 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -17,22 +17,18 @@
*/
package org.apache.beam.runners.direct;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.cache.CacheLoader;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
-import org.apache.beam.runners.core.ProcessFnRunner;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
+import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -54,30 +50,16 @@ class SplittableProcessElementsEvaluatorFactory<
implements TransformEvaluatorFactory {
private final ParDoEvaluatorFactory<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
delegateFactory;
- private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setThreadFactory(MoreExecutors.platformThreadFactory())
- .setNameFormat("direct-splittable-process-element-checkpoint-executor")
- .build());
+ private final EvaluationContext evaluationContext;
SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
this.delegateFactory =
- new ParDoEvaluatorFactory<>(
- evaluationContext,
- SplittableProcessElementsEvaluatorFactory.
- <InputT, OutputT, RestrictionT>processFnRunnerFactory(),
- new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
- @Override
- public DoFnLifecycleManager load(final AppliedPTransform<?, ?, ?> application) {
- checkArgument(
- ProcessElements.class.isInstance(application.getTransform()),
- "No know extraction of the fn from " + application);
- final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
- (ProcessElements<InputT, OutputT, RestrictionT, TrackerT>)
- application.getTransform();
- return DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn()));
- }
- });
+ new ParDoEvaluatorFactory<>(
+ evaluationContext,
+ SplittableProcessElementsEvaluatorFactory
+ .<InputT, OutputT, RestrictionT>processFnRunnerFactory(),
+ ParDoEvaluatorFactory.basicDoFnCacheLoader());
}
@Override
@@ -86,14 +68,12 @@ class SplittableProcessElementsEvaluatorFactory<
@SuppressWarnings({"unchecked", "rawtypes"})
TransformEvaluator<T> evaluator =
(TransformEvaluator<T>)
- createEvaluator((AppliedPTransform) application,
- (CommittedBundle) inputBundle);
+ createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle);
return evaluator;
}
@Override
public void cleanup() throws Exception {
- ses.shutdownNow(); // stop before cleaning
delegateFactory.cleanup();
}
@@ -106,29 +86,43 @@ class SplittableProcessElementsEvaluatorFactory<
CommittedBundle<InputT> inputBundle)
throws Exception {
final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
- application.getTransform();
- final DoFnLifecycleManagerRemovingTransformEvaluator
- <KeyedWorkItem<String, KV<InputT, RestrictionT>>> evaluator =
- delegateFactory.createEvaluator(
- (AppliedPTransform) application,
- (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>) inputBundle.getPCollection(),
- inputBundle.getKey(),
- application.getTransform().getSideInputs(),
- application.getTransform().getMainOutputTag(),
- application.getTransform().getAdditionalOutputTags().getAll());
-
- final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> pde =
- evaluator.getParDoEvaluator();
- final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
- (ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
- ProcessFnRunner.class.cast(pde.getFnRunner()).getFn();
- final DirectExecutionContext.DirectStepContext stepContext = pde.getStepContext();
- processFn.setStateInternalsFactory(key -> stepContext.stateInternals());
+ application.getTransform();
+
+ ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+ transform.newProcessFn(transform.getFn());
+
+ DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
+ processFn =
+ ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+ fnManager.<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>get());
+
+ String stepName = evaluationContext.getStepName(application);
+ final DirectExecutionContext.DirectStepContext stepContext =
+ evaluationContext
+ .getExecutionContext(application, inputBundle.getKey())
+ .getStepContext(stepName);
+
+ final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
+ parDoEvaluator =
+ delegateFactory.createParDoEvaluator(
+ application,
+ inputBundle.getKey(),
+ (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>)
+ inputBundle.getPCollection(),
+ transform.getSideInputs(),
+ transform.getMainOutputTag(),
+ transform.getAdditionalOutputTags().getAll(),
+ stepContext,
+ processFn,
+ fnManager);
+
+ processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals());
+
processFn.setTimerInternalsFactory(key -> stepContext.timerInternals());
OutputWindowedValue<OutputT> outputWindowedValue =
new OutputWindowedValue<OutputT>() {
- private final OutputManager outputManager = pde.getOutputManager();
+ private final OutputManager outputManager = parDoEvaluator.getOutputManager();
@Override
public void outputWindowedValue(
@@ -150,18 +144,27 @@ class SplittableProcessElementsEvaluatorFactory<
outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
}
};
- processFn.setProcessElementInvoker(
+ processFn.setProcessElementInvoker(
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
transform.getFn(),
- delegateFactory.evaluationContext.getPipelineOptions(),
+ evaluationContext.getPipelineOptions(),
outputWindowedValue,
- delegateFactory.evaluationContext.createSideInputReader(transform.getSideInputs()),
- ses,
+ evaluationContext.createSideInputReader(transform.getSideInputs()),
+ // TODO: For better performance, use a higher-level executor?
+ // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the
+ // DirectRunner.
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setDaemon(true)
+ .setNameFormat("direct-splittable-process-element-checkpoint-executor")
+ .build()),
// Setting small values here to stimulate frequent checkpointing and better exercise
// splittable DoFn's in that respect.
100,
Duration.standardSeconds(1)));
- return evaluator;
+
+ return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
}
private static <InputT, OutputT, RestrictionT>
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 1c8b144..830d0c1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThat;
@@ -40,7 +39,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -301,39 +299,6 @@ public class DirectRunnerTest implements Serializable {
assertThat(result.getState(), is(State.RUNNING));
}
- private static final AtomicLong TEARDOWN_CALL = new AtomicLong(-1);
-
- @Test
- public void tearsDownFnsBeforeFinishing() {
- TEARDOWN_CALL.set(-1);
- final Pipeline pipeline = getPipeline();
- pipeline.apply(Create.of("a"))
- .apply(ParDo.of(new DoFn<String, String>() {
- @ProcessElement
- public void onElement(final ProcessContext ctx) {
- // no-op
- }
-
- @Teardown
- public void teardown() {
- // just to not have a fast execution hiding an issue until we have a shutdown callback
- try {
- Thread.sleep(1000);
- } catch (final InterruptedException e) {
- fail();
- }
- TEARDOWN_CALL.set(System.nanoTime());
- }
- }));
- final PipelineResult pipelineResult = pipeline.run();
- pipelineResult.waitUntilFinish();
-
- final long doneTs = System.nanoTime();
- final long tearDownTs = TEARDOWN_CALL.get();
- assertThat(tearDownTs, greaterThan(0L));
- assertThat(doneTs, greaterThan(tearDownTs));
- }
-
@Test
public void transformDisplayDataExceptionShouldFail() {
DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
--
To stop receiving notification emails like this one, please contact
robertwb@apache.org.