You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/16 01:11:38 UTC
[2/3] incubator-beam git commit: Move Shared construction code to
ParDoInProcessEvaluator
Move Shared construction code to ParDoInProcessEvaluator
Remove duplicate code in ParDo(Single/Multi)EvaluatorFactory; instead
only extract the appropriate elements and pass them to the
ParDoInProcessEvaluator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64144259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64144259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64144259
Branch: refs/heads/master
Commit: 64144259551f1cf627545e0329c5a0daf087e7d2
Parents: 7a5b7ad
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 17:38:22 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 15 16:11:32 2016 -0700
----------------------------------------------------------------------
.../inprocess/ParDoInProcessEvaluator.java | 50 +++++++++++++++++-
.../inprocess/ParDoMultiEvaluatorFactory.java | 55 +++++---------------
.../inprocess/ParDoSingleEvaluatorFactory.java | 52 +++++-------------
3 files changed, 75 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index a68fa53..7365527 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -18,15 +18,19 @@
package org.apache.beam.sdk.runners.inprocess;
import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
+import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.CounterSet;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import java.util.ArrayList;
@@ -36,13 +40,57 @@ import java.util.List;
import java.util.Map;
class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
+ public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(
+ InProcessEvaluationContext evaluationContext,
+ CommittedBundle<InputT> inputBundle,
+ AppliedPTransform<PCollection<InputT>, ?, ?> application,
+ DoFn<InputT, OutputT> fn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ Map<TupleTag<?>, PCollection<?>> outputs) {
+ InProcessExecutionContext executionContext =
+ evaluationContext.getExecutionContext(application, inputBundle.getKey());
+ String stepName = evaluationContext.getStepName(application);
+ InProcessStepContext stepContext =
+ executionContext.getOrCreateStepContext(stepName, stepName, null);
+
+ CounterSet counters = evaluationContext.createCounterSet();
+
+ Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
+ for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+ outputBundles.put(
+ outputEntry.getKey(),
+ evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
+ }
+
+ DoFnRunner<InputT, OutputT> runner =
+ DoFnRunners.createDefault(
+ evaluationContext.getPipelineOptions(),
+ fn,
+ evaluationContext.createSideInputReader(sideInputs),
+ BundleOutputManager.create(outputBundles),
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ counters.getAddCounterMutator(),
+ application.getInput().getWindowingStrategy());
+
+ runner.startBundle();
+
+ return new ParDoInProcessEvaluator<>(
+ runner, application, counters, outputBundles.values(), stepContext);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+
private final DoFnRunner<T, ?> fnRunner;
private final AppliedPTransform<PCollection<T>, ?, ?> transform;
private final CounterSet counters;
private final Collection<UncommittedBundle<?>> outputBundles;
private final InProcessStepContext stepContext;
- public ParDoInProcessEvaluator(
+ private ParDoInProcessEvaluator(
DoFnRunner<T, ?> fnRunner,
AppliedPTransform<PCollection<T>, ?, ?> transform,
CounterSet counters,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
index 2b95574..299d3a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
@@ -16,23 +16,15 @@
* limitations under the License.
*/
package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.common.CounterSet;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -45,9 +37,9 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
AppliedPTransform<?, ?, ?> application,
CommittedBundle<?> inputBundle,
InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createMultiEvaluator(
- (AppliedPTransform) application, inputBundle, evaluationContext);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TransformEvaluator<T> evaluator =
+ createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
return evaluator;
}
@@ -55,38 +47,17 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
CommittedBundle<InT> inputBundle,
InProcessEvaluationContext evaluationContext) {
- PCollectionTuple output = application.getOutput();
- Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
- Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
- for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
- outputBundles.put(
- outputEntry.getKey(),
- evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
- }
- InProcessExecutionContext executionContext =
- evaluationContext.getExecutionContext(application, inputBundle.getKey());
- String stepName = evaluationContext.getStepName(application);
- InProcessStepContext stepContext =
- executionContext.getOrCreateStepContext(stepName, stepName, null);
-
- CounterSet counters = evaluationContext.createCounterSet();
-
+ Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
DoFn<InT, OuT> fn = application.getTransform().getFn();
- DoFnRunner<InT, OuT> runner =
- DoFnRunners.createDefault(
- evaluationContext.getPipelineOptions(),
- fn,
- evaluationContext.createSideInputReader(application.getTransform().getSideInputs()),
- BundleOutputManager.create(outputBundles),
- application.getTransform().getMainOutputTag(),
- application.getTransform().getSideOutputTags().getAll(),
- stepContext,
- counters.getAddCounterMutator(),
- application.getInput().getWindowingStrategy());
-
- runner.startBundle();
- return new ParDoInProcessEvaluator<>(
- runner, application, counters, outputBundles.values(), stepContext);
+ return ParDoInProcessEvaluator.create(
+ evaluationContext,
+ inputBundle,
+ application,
+ fn,
+ application.getTransform().getSideInputs(),
+ application.getTransform().getMainOutputTag(),
+ application.getTransform().getSideOutputTags().getAll(),
+ outputs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
index 044b7e0..4d38448 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
@@ -17,19 +17,15 @@
*/
package org.apache.beam.sdk.runners.inprocess;
-import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.Bound;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.common.CounterSet;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.collect.ImmutableMap;
+
import java.util.Collections;
/**
@@ -42,9 +38,9 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
final AppliedPTransform<?, ?, ?> application,
CommittedBundle<?> inputBundle,
InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createSingleEvaluator(
- (AppliedPTransform) application, inputBundle, evaluationContext);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TransformEvaluator<T> evaluator =
+ createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
return evaluator;
}
@@ -53,37 +49,15 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
Bound<InputT, OutputT>> application,
CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
- UncommittedBundle<OutputT> outputBundle =
- evaluationContext.createBundle(inputBundle, application.getOutput());
-
- InProcessExecutionContext executionContext =
- evaluationContext.getExecutionContext(application, inputBundle.getKey());
- String stepName = evaluationContext.getStepName(application);
- InProcessStepContext stepContext =
- executionContext.getOrCreateStepContext(stepName, stepName, null);
-
- CounterSet counters = evaluationContext.createCounterSet();
-
- DoFnRunner<InputT, OutputT> runner =
- DoFnRunners.createDefault(
- evaluationContext.getPipelineOptions(),
- application.getTransform().getFn(),
- evaluationContext.createSideInputReader(application.getTransform().getSideInputs()),
- BundleOutputManager.create(
- Collections.<TupleTag<?>, UncommittedBundle<?>>singletonMap(
- mainOutputTag, outputBundle)),
- mainOutputTag,
- Collections.<TupleTag<?>>emptyList(),
- stepContext,
- counters.getAddCounterMutator(),
- application.getInput().getWindowingStrategy());
- runner.startBundle();
- return new ParDoInProcessEvaluator<InputT>(
- runner,
+ return ParDoInProcessEvaluator.create(
+ evaluationContext,
+ inputBundle,
application,
- counters,
- Collections.<UncommittedBundle<?>>singleton(outputBundle),
- stepContext);
+ application.getTransform().getFn(),
+ application.getTransform().getSideInputs(),
+ mainOutputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
}
}