You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 01:12:29 UTC
[5/7] beam git commit: Creates ProcessFnRunner and wires it through
ParDoEvaluator
Creates ProcessFnRunner and wires it through ParDoEvaluator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b93de58f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b93de58f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b93de58f
Branch: refs/heads/master
Commit: b93de58f5a3a10877997815a793725cb0e53cc2d
Parents: 7e1a267
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 14:52:23 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunners.java | 32 +++++
.../beam/runners/core/ProcessFnRunner.java | 127 +++++++++++++++++++
.../beam/runners/direct/ParDoEvaluator.java | 114 +++++++++++++----
.../runners/direct/ParDoEvaluatorFactory.java | 11 +-
...littableProcessElementsEvaluatorFactory.java | 106 ++++++++++++----
.../direct/StatefulParDoEvaluatorFactory.java | 4 +-
.../direct/TransformEvaluatorRegistry.java | 4 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 3 +-
8 files changed, 341 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index b09ee08..8501e72 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.runners.core;
+import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -26,10 +28,12 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
/**
@@ -146,4 +150,32 @@ public class DoFnRunners {
stateCleaner,
droppedDueToLateness);
}
+
+ public static <InputT, OutputT, RestrictionT>
+ ProcessFnRunner<InputT, OutputT, RestrictionT>
+ newProcessFnRunner(
+ ProcessFn<InputT, OutputT, RestrictionT, ?> fn,
+ PipelineOptions options,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return new ProcessFnRunner<>(
+ simpleRunner(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy),
+ views,
+ sideInputReader);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
new file mode 100644
index 0000000..3ae3f50
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.core.SplittableParDo.ProcessFn;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
+
+/** Runs a {@link ProcessFn} by constructing the appropriate contexts and passing them in. */
+public class ProcessFnRunner<InputT, OutputT, RestrictionT>
+ implements PushbackSideInputDoFnRunner<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+ private final DoFnRunner<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ underlying;
+ private final Collection<PCollectionView<?>> views;
+ private final ReadyCheckingSideInputReader sideInputReader;
+
+ ProcessFnRunner(
+ DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ this.underlying = underlying;
+ this.views = views;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @Override
+ public void startBundle() {
+ underlying.startBundle();
+ }
+
+ @Override
+ public Iterable<WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>>
+ processElementInReadyWindows(
+ WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ windowedKWI) {
+ checkTrivialOuterWindows(windowedKWI);
+ BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue());
+ if (!isReady(window)) {
+ return Collections.singletonList(windowedKWI);
+ }
+ underlying.processElement(windowedKWI);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void finishBundle() {
+ underlying.finishBundle();
+ }
+
+ @Override
+ public void onTimer(
+ String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
+ }
+
+ private static <T> void checkTrivialOuterWindows(
+ WindowedValue<KeyedWorkItem<String, T>> windowedKWI) {
+ // In practice it will be in 0 or 1 windows (ValueInEmptyWindows or ValueInGlobalWindow)
+ Collection<? extends BoundedWindow> outerWindows = windowedKWI.getWindows();
+ if (!outerWindows.isEmpty()) {
+ checkArgument(
+ outerWindows.size() == 1,
+ "The KeyedWorkItem itself must not be in multiple windows, but was in: %s",
+ outerWindows);
+ BoundedWindow onlyWindow = Iterables.getOnlyElement(outerWindows);
+ checkArgument(
+ onlyWindow instanceof GlobalWindow,
+ "KeyedWorkItem must be in the Global window, but was in: %s",
+ onlyWindow);
+ }
+ }
+
+ private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<String, T> kwi) {
+ if (Iterables.isEmpty(kwi.elementsIterable())) {
+ // ProcessFn sets only a single timer.
+ TimerData timer = Iterables.getOnlyElement(kwi.timersIterable());
+ return ((WindowNamespace) timer.getNamespace()).getWindow();
+ } else {
+ // KWI must have a single element in elementsIterable, because it follows a GBK by a
+ // uniquely generated key.
+ // Additionally, windows must be exploded before GBKIntoKeyedWorkItems, so there's also
+ // only a single window.
+ WindowedValue<T> value = Iterables.getOnlyElement(kwi.elementsIterable());
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+ }
+
+ private boolean isReady(BoundedWindow mainInputWindow) {
+ for (PCollectionView<?> view : views) {
+ BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+ if (!sideInputReader.isReady(view, sideInputWindow)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index bab7b2c..cab11db 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -43,6 +44,50 @@ import org.apache.beam.sdk.values.TupleTag;
class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
+ public interface DoFnRunnerFactory<InputT, OutputT> {
+ PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ DirectStepContext stepContext,
+ AggregatorContainer.Mutator aggregatorChanges,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy);
+ }
+
+ public static <InputT, OutputT> DoFnRunnerFactory<InputT, OutputT> defaultRunnerFactory() {
+ return new DoFnRunnerFactory<InputT, OutputT>() {
+ @Override
+ public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ DirectStepContext stepContext,
+ AggregatorContainer.Mutator aggregatorChanges,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+ DoFnRunner<InputT, OutputT> underlying =
+ DoFnRunners.simpleRunner(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorChanges,
+ windowingStrategy);
+ return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+ }
+ };
+ }
+
public static <InputT, OutputT> ParDoEvaluator<InputT> create(
EvaluationContext evaluationContext,
DirectStepContext stepContext,
@@ -53,9 +98,43 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
- Map<TupleTag<?>, PCollection<?>> outputs) {
+ Map<TupleTag<?>, PCollection<?>> outputs,
+ DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
+ BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs);
+
+ ReadyCheckingSideInputReader sideInputReader =
+ evaluationContext.createSideInputReader(sideInputs);
+
+ PushbackSideInputDoFnRunner<InputT, OutputT> runner = runnerFactory.createRunner(
+ evaluationContext.getPipelineOptions(),
+ fn,
+ sideInputs,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorChanges,
+ windowingStrategy);
+
+ return create(runner, stepContext, application, aggregatorChanges, outputManager);
+ }
+
+ public static <InputT, OutputT> ParDoEvaluator<InputT> create(
+ PushbackSideInputDoFnRunner<InputT, OutputT> runner,
+ DirectStepContext stepContext,
+ AppliedPTransform<?, ?, ?> application,
+ AggregatorContainer.Mutator aggregatorChanges,
+ BundleOutputManager outputManager) {
+ return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+ }
+
+ static BundleOutputManager createOutputManager(
+ EvaluationContext evaluationContext,
+ StructuralKey<?> key,
+ Map<TupleTag<?>, PCollection<?>> outputs) {
Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
// Just trust the context's decision as to whether the output should be keyed.
@@ -69,32 +148,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
}
}
- BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
-
- ReadyCheckingSideInputReader sideInputReader =
- evaluationContext.createSideInputReader(sideInputs);
-
- DoFnRunner<InputT, OutputT> underlying =
- DoFnRunners.simpleRunner(
- evaluationContext.getPipelineOptions(),
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- additionalOutputTags,
- stepContext,
- aggregatorChanges,
- windowingStrategy);
- PushbackSideInputDoFnRunner<InputT, OutputT> runner =
- SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
-
- try {
- runner.startBundle();
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
-
- return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+ return BundleOutputManager.create(outputBundles);
}
////////////////////////////////////////////////////////////////////////////////////////////////
@@ -119,6 +173,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
this.stepContext = stepContext;
this.aggregatorChanges = aggregatorChanges;
this.unprocessedElements = ImmutableList.builder();
+
+ try {
+ fnRunner.startBundle();
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
}
public BundleOutputManager getOutputManager() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 93f204a..b00c2b6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -43,9 +43,13 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
private final EvaluationContext evaluationContext;
+ private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;
- ParDoEvaluatorFactory(EvaluationContext evaluationContext) {
+ ParDoEvaluatorFactory(
+ EvaluationContext evaluationContext,
+ ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
this.evaluationContext = evaluationContext;
+ this.runnerFactory = runnerFactory;
fnClones =
CacheBuilder.newBuilder()
.build(
@@ -148,7 +152,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
sideInputs,
mainOutputTag,
additionalOutputTags,
- pcollections(application.getOutputs()));
+ pcollections(application.getOutputs()),
+ runnerFactory);
} catch (Exception e) {
try {
fnManager.remove();
@@ -162,7 +167,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
}
}
- private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
+ static Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>();
for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
pcs.put(output.getKey(), (PCollection<?>) output.getValue());
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 00b16dd..7efdb52 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -18,25 +18,34 @@
package org.apache.beam.runners.direct;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ElementAndRestriction;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -51,7 +60,11 @@ class SplittableProcessElementsEvaluatorFactory<
SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
- this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+ this.delegateFactory =
+ new ParDoEvaluatorFactory<>(
+ evaluationContext,
+ SplittableProcessElementsEvaluatorFactory
+ .<InputT, OutputT, RestrictionT>processFnRunnerFactory());
}
@Override
@@ -82,12 +95,12 @@ class SplittableProcessElementsEvaluatorFactory<
final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
application.getTransform();
- SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+ ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
transform.newProcessFn(transform.getFn());
DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
processFn =
- ((SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+ ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
fnManager
.<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
get());
@@ -98,7 +111,7 @@ class SplittableProcessElementsEvaluatorFactory<
.getExecutionContext(application, inputBundle.getKey())
.getOrCreateStepContext(stepName, stepName);
- ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
parDoEvaluator =
delegateFactory.createParDoEvaluator(
application,
@@ -127,34 +140,36 @@ class SplittableProcessElementsEvaluatorFactory<
}
});
- final OutputManager outputManager = parDoEvaluator.getOutputManager();
+ OutputWindowedValue<OutputT> outputWindowedValue =
+ new OutputWindowedValue<OutputT>() {
+ private final OutputManager outputManager = parDoEvaluator.getOutputManager();
+
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(
+ transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <AdditionalOutputT> void outputWindowedValue(
+ TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+ }
+ };
processFn.setProcessElementInvoker(
new OutputAndTimeBoundedSplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>(
transform.getFn(),
evaluationContext.getPipelineOptions(),
- new OutputWindowedValue<OutputT>() {
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputManager.output(
- transform.getMainOutputTag(),
- WindowedValue.of(output, timestamp, windows, pane));
- }
-
- @Override
- public <AdditionalOutputT> void outputWindowedValue(
- TupleTag<AdditionalOutputT> tag,
- AdditionalOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
- }
- },
+ outputWindowedValue,
evaluationContext.createSideInputReader(transform.getSideInputs()),
// TODO: For better performance, use a higher-level executor?
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
@@ -163,4 +178,41 @@ class SplittableProcessElementsEvaluatorFactory<
return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
}
+
+ private static <InputT, OutputT, RestrictionT>
+ ParDoEvaluator.DoFnRunnerFactory<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ processFnRunnerFactory() {
+ return new ParDoEvaluator.DoFnRunnerFactory<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+ @Override
+ public PushbackSideInputDoFnRunner<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ createRunner(
+ PipelineOptions options,
+ DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ ReadyCheckingSideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ DirectExecutionContext.DirectStepContext stepContext,
+ AggregatorContainer.Mutator aggregatorChanges,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+ ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
+ (ProcessFn) fn;
+ return DoFnRunners.newProcessFnRunner(
+ processFn,
+ options,
+ sideInputs,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ aggregatorChanges,
+ windowingStrategy);
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index be77ea1..8793ae8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -65,7 +65,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
- this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+ this.delegateFactory =
+ new ParDoEvaluatorFactory<>(
+ evaluationContext, ParDoEvaluator.<KV<K, InputT>, OutputT>defaultRunnerFactory());
this.cleanupRegistry =
CacheBuilder.newBuilder()
.weakValues()
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index ae7ad93..d06c460 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -52,7 +52,9 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
.put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
.put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
- .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt))
+ .put(
+ ParDo.MultiOutput.class,
+ new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory()))
.put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
.put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
.put(WriteView.class, new ViewEvaluatorFactory(ctxt))
http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 2be0f9d..e99e4bf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -169,7 +169,8 @@ public class ParDoEvaluatorTest {
ImmutableList.<PCollectionView<?>>of(singletonView),
mainOutputTag,
additionalOutputTags,
- ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
+ ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output),
+ ParDoEvaluator.<Integer, Integer>defaultRunnerFactory());
}
private static class RecorderFn extends DoFn<Integer, Integer> {