You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/16 01:11:38 UTC

[2/3] incubator-beam git commit: Move Shared construction code to ParDoInProcessEvaluator

Move Shared construction code to ParDoInProcessEvaluator

Remove duplicate code in ParDo(Single/Multi)EvaluatorFactory; instead
only extract the appropriate elements and pass them to the
ParDoInProcessEvaluator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64144259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64144259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64144259

Branch: refs/heads/master
Commit: 64144259551f1cf627545e0329c5a0daf087e7d2
Parents: 7a5b7ad
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 17:38:22 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 15 16:11:32 2016 -0700

----------------------------------------------------------------------
 .../inprocess/ParDoInProcessEvaluator.java      | 50 +++++++++++++++++-
 .../inprocess/ParDoMultiEvaluatorFactory.java   | 55 +++++---------------
 .../inprocess/ParDoSingleEvaluatorFactory.java  | 52 +++++-------------
 3 files changed, 75 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index a68fa53..7365527 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -18,15 +18,19 @@
 package org.apache.beam.sdk.runners.inprocess;
 
 import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
+import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 import java.util.ArrayList;
@@ -36,13 +40,57 @@ import java.util.List;
 import java.util.Map;
 
 class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
+  public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(
+      InProcessEvaluationContext evaluationContext,
+      CommittedBundle<InputT> inputBundle,
+      AppliedPTransform<PCollection<InputT>, ?, ?> application,
+      DoFn<InputT, OutputT> fn,
+      List<PCollectionView<?>> sideInputs,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      Map<TupleTag<?>, PCollection<?>> outputs) {
+    InProcessExecutionContext executionContext =
+        evaluationContext.getExecutionContext(application, inputBundle.getKey());
+    String stepName = evaluationContext.getStepName(application);
+    InProcessStepContext stepContext =
+        executionContext.getOrCreateStepContext(stepName, stepName, null);
+
+    CounterSet counters = evaluationContext.createCounterSet();
+
+    Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
+    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+      outputBundles.put(
+          outputEntry.getKey(),
+          evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
+    }
+
+    DoFnRunner<InputT, OutputT> runner =
+        DoFnRunners.createDefault(
+            evaluationContext.getPipelineOptions(),
+            fn,
+            evaluationContext.createSideInputReader(sideInputs),
+            BundleOutputManager.create(outputBundles),
+            mainOutputTag,
+            sideOutputTags,
+            stepContext,
+            counters.getAddCounterMutator(),
+            application.getInput().getWindowingStrategy());
+
+    runner.startBundle();
+
+    return new ParDoInProcessEvaluator<>(
+        runner, application, counters, outputBundles.values(), stepContext);
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+
   private final DoFnRunner<T, ?> fnRunner;
   private final AppliedPTransform<PCollection<T>, ?, ?> transform;
   private final CounterSet counters;
   private final Collection<UncommittedBundle<?>> outputBundles;
   private final InProcessStepContext stepContext;
 
-  public ParDoInProcessEvaluator(
+  private ParDoInProcessEvaluator(
       DoFnRunner<T, ?> fnRunner,
       AppliedPTransform<PCollection<T>, ?, ?> transform,
       CounterSet counters,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
index 2b95574..299d3a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
@@ -16,23 +16,15 @@
  * limitations under the License.
  */
 package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -45,9 +37,9 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
       AppliedPTransform<?, ?, ?> application,
       CommittedBundle<?> inputBundle,
       InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createMultiEvaluator(
-            (AppliedPTransform) application, inputBundle, evaluationContext);
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator =
+        createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
     return evaluator;
   }
 
@@ -55,38 +47,17 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
       AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
       CommittedBundle<InT> inputBundle,
       InProcessEvaluationContext evaluationContext) {
-    PCollectionTuple output = application.getOutput();
-    Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
-    Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
-    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
-      outputBundles.put(
-          outputEntry.getKey(),
-          evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
-    }
-    InProcessExecutionContext executionContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey());
-    String stepName = evaluationContext.getStepName(application);
-    InProcessStepContext stepContext =
-        executionContext.getOrCreateStepContext(stepName, stepName, null);
-
-    CounterSet counters = evaluationContext.createCounterSet();
-
+    Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
     DoFn<InT, OuT> fn = application.getTransform().getFn();
-    DoFnRunner<InT, OuT> runner =
-        DoFnRunners.createDefault(
-            evaluationContext.getPipelineOptions(),
-            fn,
-            evaluationContext.createSideInputReader(application.getTransform().getSideInputs()),
-            BundleOutputManager.create(outputBundles),
-            application.getTransform().getMainOutputTag(),
-            application.getTransform().getSideOutputTags().getAll(),
-            stepContext,
-            counters.getAddCounterMutator(),
-            application.getInput().getWindowingStrategy());
-
-    runner.startBundle();
 
-    return new ParDoInProcessEvaluator<>(
-        runner, application, counters, outputBundles.values(), stepContext);
+    return ParDoInProcessEvaluator.create(
+        evaluationContext,
+        inputBundle,
+        application,
+        fn,
+        application.getTransform().getSideInputs(),
+        application.getTransform().getMainOutputTag(),
+        application.getTransform().getSideOutputTags().getAll(),
+        outputs);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
index 044b7e0..4d38448 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
@@ -17,19 +17,15 @@
  */
 package org.apache.beam.sdk.runners.inprocess;
 
-import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 
+import com.google.common.collect.ImmutableMap;
+
 import java.util.Collections;
 
 /**
@@ -42,9 +38,9 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
       final AppliedPTransform<?, ?, ?> application,
       CommittedBundle<?> inputBundle,
       InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createSingleEvaluator(
-            (AppliedPTransform) application, inputBundle, evaluationContext);
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator =
+        createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
     return evaluator;
   }
 
@@ -53,37 +49,15 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
           Bound<InputT, OutputT>> application,
       CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
     TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
-    UncommittedBundle<OutputT> outputBundle =
-        evaluationContext.createBundle(inputBundle, application.getOutput());
-
-    InProcessExecutionContext executionContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey());
-    String stepName = evaluationContext.getStepName(application);
-    InProcessStepContext stepContext =
-        executionContext.getOrCreateStepContext(stepName, stepName, null);
-
-    CounterSet counters = evaluationContext.createCounterSet();
-
-    DoFnRunner<InputT, OutputT> runner =
-        DoFnRunners.createDefault(
-            evaluationContext.getPipelineOptions(),
-            application.getTransform().getFn(),
-            evaluationContext.createSideInputReader(application.getTransform().getSideInputs()),
-            BundleOutputManager.create(
-                Collections.<TupleTag<?>, UncommittedBundle<?>>singletonMap(
-                    mainOutputTag, outputBundle)),
-            mainOutputTag,
-            Collections.<TupleTag<?>>emptyList(),
-            stepContext,
-            counters.getAddCounterMutator(),
-            application.getInput().getWindowingStrategy());
 
-    runner.startBundle();
-    return new ParDoInProcessEvaluator<InputT>(
-        runner,
+    return ParDoInProcessEvaluator.create(
+        evaluationContext,
+        inputBundle,
         application,
-        counters,
-        Collections.<UncommittedBundle<?>>singleton(outputBundle),
-        stepContext);
+        application.getTransform().getFn(),
+        application.getTransform().getSideInputs(),
+        mainOutputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
   }
 }