You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:30 UTC

[5/6] beam git commit: Remembers the output coders of SplittableParDo

Remembers the output coders of SplittableParDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48690bc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48690bc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48690bc6

Branch: refs/heads/master
Commit: 48690bc61673e767d4a1fa72e0499c32f160db39
Parents: 95e2a00
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 17:35:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 17:35:03 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  5 +-
 .../core/construction/SplittableParDo.java      | 65 ++++++++++----------
 .../core/construction/SplittableParDoTest.java  | 33 +++++-----
 .../core/SplittableParDoViaKeyedWorkItems.java  |  1 +
 .../direct/ParDoMultiOverrideFactory.java       |  2 +-
 .../dataflow/SplittableParDoOverrides.java      |  2 +-
 6 files changed, 56 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index cee524e..57d2593 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -379,8 +379,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
         AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
           transform) {
-      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
-          SplittableParDo.forJavaParDo(transform.getTransform()));
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          SplittableParDo.forAppliedParDo(transform));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index bcc5de8..32d3409 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -74,6 +74,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   private final List<PCollectionView<?>> sideInputs;
   private final TupleTag<OutputT> mainOutputTag;
   private final TupleTagList additionalOutputTags;
+  private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
 
   public static final String SPLITTABLE_PROCESS_URN =
       "urn:beam:runners_core:transforms:splittable_process:v1";
@@ -86,34 +87,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
   private SplittableParDo(
       DoFn<InputT, OutputT> doFn,
-      TupleTag<OutputT> mainOutputTag,
       List<PCollectionView<?>> sideInputs,
-      TupleTagList additionalOutputTags) {
+      TupleTag<OutputT> mainOutputTag,
+      TupleTagList additionalOutputTags,
+      Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
     checkArgument(
         DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
         "fn must be a splittable DoFn");
     this.doFn = doFn;
-    this.mainOutputTag = mainOutputTag;
     this.sideInputs = sideInputs;
+    this.mainOutputTag = mainOutputTag;
     this.additionalOutputTags = additionalOutputTags;
-  }
-
-  /**
-   * Creates a {@link SplittableParDo} from an original Java {@link ParDo}.
-   *
-   * @param parDo The splittable {@link ParDo} transform.
-   */
-  public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
-      ParDo.MultiOutput<InputT, OutputT> parDo) {
-    checkArgument(parDo != null, "parDo must not be null");
-    checkArgument(
-        DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
-        "fn must be a splittable DoFn");
-    return new SplittableParDo(
-        parDo.getFn(),
-        parDo.getMainOutputTag(),
-        parDo.getSideInputs(),
-        parDo.getAdditionalOutputTags());
+    this.outputTagsToCoders = outputTagsToCoders;
   }
 
   /**
@@ -122,15 +107,22 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
    * <p>The input may generally be a deserialized transform so it may not actually be a {@link
    * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
    */
-  public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) {
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forAppliedParDo(
+      AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> parDo) {
     checkArgument(parDo != null, "parDo must not be null");
 
     try {
-      return new SplittableParDo<>(
+      Map<TupleTag<?>, Coder<?>> outputTagsToCoders = Maps.newHashMap();
+      for (Map.Entry<TupleTag<?>, PValue> entry : parDo.getOutputs().entrySet()) {
+        outputTagsToCoders.put(entry.getKey(), ((PCollection) entry.getValue()).getCoder());
+      }
+      return new SplittableParDo(
           ParDoTranslation.getDoFn(parDo),
-          (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
           ParDoTranslation.getSideInputs(parDo),
-          ParDoTranslation.getAdditionalOutputTags(parDo));
+          ParDoTranslation.getMainOutputTag(parDo),
+          ParDoTranslation.getAdditionalOutputTags(parDo),
+          outputTagsToCoders);
     } catch (IOException exc) {
       throw new RuntimeException(exc);
     }
@@ -169,7 +161,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
             sideInputs,
             mainOutputTag,
-            additionalOutputTags));
+            additionalOutputTags,
+            outputTagsToCoders));
   }
 
   @Override
@@ -203,6 +196,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList additionalOutputTags;
+    private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
 
     /**
      * @param fn the splittable {@link DoFn}.
@@ -210,7 +204,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
      * @param sideInputs list of side inputs that should be available to the {@link DoFn}.
      * @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
      * @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
-     *     outputs.
+     * @param outputTagsToCoders A map from output tag to the coder for that output, which should
+     *     provide mappings for the main and all additional tags.
      */
     public ProcessKeyedElements(
         DoFn<InputT, OutputT> fn,
@@ -219,7 +214,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         WindowingStrategy<InputT, ?> windowingStrategy,
         List<PCollectionView<?>> sideInputs,
         TupleTag<OutputT> mainOutputTag,
-        TupleTagList additionalOutputTags) {
+        TupleTagList additionalOutputTags,
+        Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
       this.fn = fn;
       this.elementCoder = elementCoder;
       this.restrictionCoder = restrictionCoder;
@@ -227,6 +223,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.additionalOutputTags = additionalOutputTags;
+      this.outputTagsToCoders = outputTagsToCoders;
     }
 
     public DoFn<InputT, OutputT> getFn() {
@@ -257,10 +254,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       return additionalOutputTags;
     }
 
+    public Map<TupleTag<?>, Coder<?>> getOutputTagsToCoders() {
+      return outputTagsToCoders;
+    }
+
     @Override
     public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) {
       return createPrimitiveOutputFor(
-          input, fn, mainOutputTag, additionalOutputTags, windowingStrategy);
+          input, fn, mainOutputTag, additionalOutputTags, outputTagsToCoders, windowingStrategy);
     }
 
     public static <OutputT> PCollectionTuple createPrimitiveOutputFor(
@@ -268,14 +269,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         DoFn<?, OutputT> fn,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList additionalOutputTags,
+        Map<TupleTag<?>, Coder<?>> outputTagsToCoders,
         WindowingStrategy<?, ?> windowingStrategy) {
       DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
       PCollectionTuple outputs =
           PCollectionTuple.ofPrimitiveOutputsInternal(
               input.getPipeline(),
               TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
-              // TODO
-              Collections.<TupleTag<?>, Coder<?>>emptyMap(),
+              outputTagsToCoders,
               windowingStrategy,
               input.isBounded().and(signature.isBoundedPerElement()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 267232c..05c471d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.junit.Rule;
@@ -106,12 +108,18 @@ public class SplittableParDoTest {
 
   private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {};
 
-  private ParDo.MultiOutput<Integer, String> makeParDo(DoFn<Integer, String> fn) {
-    return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+  private PCollection<String> applySplittableParDo(
+      String name, PCollection<Integer> input, DoFn<Integer, String> fn) {
+    ParDo.MultiOutput<Integer, String> multiOutput =
+        ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+    PCollectionTuple output = multiOutput.expand(input);
+    output.get(MAIN_OUTPUT_TAG).setName("main");
+    AppliedPTransform<PCollection<Integer>, PCollectionTuple, ?> transform =
+        AppliedPTransform.of("ParDo", input.expand(), output.expand(), multiOutput, pipeline);
+    return input.apply(name, SplittableParDo.forAppliedParDo(transform)).get(MAIN_OUTPUT_TAG);
   }
 
-  @Rule
-  public TestPipeline pipeline = TestPipeline.create();
+  @Rule public TestPipeline pipeline = TestPipeline.create();
 
   @Test
   public void testBoundednessForBoundedFn() {
@@ -121,16 +129,12 @@ public class SplittableParDoTest {
     assertEquals(
         "Applying a bounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.BOUNDED,
-        makeBoundedCollection(pipeline)
-            .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo("bounded to bounded", makeBoundedCollection(pipeline), boundedFn)
             .isBounded());
     assertEquals(
         "Applying a bounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
-        makeUnboundedCollection(pipeline)
-            .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo("bounded to unbounded", makeUnboundedCollection(pipeline), boundedFn)
             .isBounded());
   }
 
@@ -142,16 +146,13 @@ public class SplittableParDoTest {
     assertEquals(
         "Applying an unbounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.UNBOUNDED,
-        makeBoundedCollection(pipeline)
-            .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo("unbounded to bounded", makeBoundedCollection(pipeline), unboundedFn)
             .isBounded());
     assertEquals(
         "Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
-        makeUnboundedCollection(pipeline)
-            .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo(
+                "unbounded to unbounded", makeUnboundedCollection(pipeline), unboundedFn)
             .isBounded());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index af720fd..251260e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -184,6 +184,7 @@ public class SplittableParDoViaKeyedWorkItems {
           original.getFn(),
           original.getMainOutputTag(),
           original.getAdditionalOutputTags(),
+          original.getOutputTagsToCoders(),
           original.getInputWindowingStrategy());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 3f04b56..26f30b0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -96,7 +96,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
 
     if (signature.processElement().isSplittable()) {
-      return (PTransform) SplittableParDo.forAppliedParDo(application);
+      return SplittableParDo.forAppliedParDo((AppliedPTransform) application);
     } else if (signature.stateDeclarations().size() > 0
         || signature.timerDeclarations().size() > 0) {
       return new GbkThenStatefulParDo(

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
index fc010f8..7b65950 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -64,7 +64,7 @@ class SplittableParDoOverrides {
             appliedTransform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(appliedTransform),
-          SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
+          SplittableParDo.forAppliedParDo(appliedTransform));
     }
 
     @Override