You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/28 04:34:27 UTC

[3/6] beam git commit: Fix getAdditionalInputs for SplittableParDo transforms

Fix getAdditionalInputs for SplittableParDo transforms


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a66bcd68
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a66bcd68
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a66bcd68

Branch: refs/heads/master
Commit: a66bcd68a1e56d5d38fccfce2ffeec28ba1c82de
Parents: 58fba59
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 13 10:00:09 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 27 21:08:10 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  2 +-
 .../core/construction/SplittableParDo.java      | 66 +++++++++++++++-----
 .../core/construction/SplittableParDoTest.java  |  8 +--
 .../direct/ParDoMultiOverrideFactory.java       |  2 +-
 .../flink/FlinkStreamingPipelineTranslator.java |  2 +-
 .../dataflow/SplittableParDoOverrides.java      |  2 +-
 6 files changed, 57 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 95b354a..fd0a1c9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -381,7 +381,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
         AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
           transform) {
       return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
-          new SplittableParDo<>(transform.getTransform()));
+          SplittableParDo.forJavaParDo(transform.getTransform()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 5ccafcb..f31b495 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -18,9 +18,9 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -64,7 +66,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
 public class SplittableParDo<InputT, OutputT, RestrictionT>
     extends PTransform<PCollection<InputT>, PCollectionTuple> {
-  private final ParDo.MultiOutput<InputT, OutputT> parDo;
+
+  private final DoFn<InputT, OutputT> doFn;
+  private final List<PCollectionView<?>> sideInputs;
+  private final TupleTag<OutputT> mainOutputTag;
+  private final TupleTagList additionalOutputTags;
 
   public static final String SPLITTABLE_PROCESS_URN =
       "urn:beam:runners_core:transforms:splittable_process:v1";
@@ -75,24 +81,39 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   public static final String SPLITTABLE_GBKIKWI_URN =
       "urn:beam:runners_core:transforms:splittable_gbkikwi:v1";
 
+  private SplittableParDo(
+      DoFn<InputT, OutputT> doFn,
+      TupleTag<OutputT> mainOutputTag,
+      List<PCollectionView<?>> sideInputs,
+      TupleTagList additionalOutputTags) {
+    checkArgument(
+        DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
+        "fn must be a splittable DoFn");
+    this.doFn = doFn;
+    this.mainOutputTag = mainOutputTag;
+    this.sideInputs = sideInputs;
+    this.additionalOutputTags = additionalOutputTags;
+  }
+
   /**
-   * Creates the transform for the given original multi-output {@link ParDo}.
+   * Creates a {@link SplittableParDo} from an original Java {@link ParDo}.
    *
    * @param parDo The splittable {@link ParDo} transform.
    */
-  public SplittableParDo(ParDo.MultiOutput<InputT, OutputT> parDo) {
-    checkNotNull(parDo, "parDo must not be null");
-    this.parDo = parDo;
-    checkArgument(
-        DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
-        "fn must be a splittable DoFn");
+  public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
+      ParDo.MultiOutput<InputT, OutputT> parDo) {
+    checkArgument(parDo != null, "parDo must not be null");
+    return new SplittableParDo(
+        parDo.getFn(),
+        parDo.getMainOutputTag(),
+        parDo.getSideInputs(),
+        parDo.getAdditionalOutputTags());
   }
 
   @Override
   public PCollectionTuple expand(PCollection<InputT> input) {
-    DoFn<InputT, OutputT> fn = parDo.getFn();
     Coder<RestrictionT> restrictionCoder =
-        DoFnInvokers.invokerFor(fn)
+        DoFnInvokers.invokerFor(doFn)
             .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
     Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), restrictionCoder);
 
@@ -100,9 +121,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         input
             .apply(
                 "Pair with initial restriction",
-                ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(fn)))
+                ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(doFn)))
             .setCoder(splitCoder)
-            .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
+            .apply(
+                "Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(doFn)))
             .setCoder(splitCoder)
             // ProcessFn requires all input elements to be in a single window and have a single
             // element per work item. This must precede the unique keying so each key has a single
@@ -115,13 +137,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     return keyedRestrictions.apply(
         "ProcessKeyedElements",
         new ProcessKeyedElements<>(
-            fn,
+            doFn,
             input.getCoder(),
             restrictionCoder,
             (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
-            parDo.getSideInputs(),
-            parDo.getMainOutputTag(),
-            parDo.getAdditionalOutputTags()));
+            sideInputs,
+            mainOutputTag,
+            additionalOutputTags));
+  }
+
+  @Override
+  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+    return PCollectionViews.toAdditionalInputs(sideInputs);
   }
 
   /**
@@ -231,6 +258,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     }
 
     @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      return PCollectionViews.toAdditionalInputs(sideInputs);
+    }
+
+    @Override
     public String getUrn() {
       return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 6e4d6c4..f4c596e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -122,14 +122,14 @@ public class SplittableParDoTest {
         "Applying a bounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.BOUNDED,
         makeBoundedCollection(pipeline)
-            .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn)))
+            .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
     assertEquals(
         "Applying a bounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply("bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn)))
+            .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
   }
@@ -143,14 +143,14 @@ public class SplittableParDoTest {
         "Applying an unbounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeBoundedCollection(pipeline)
-            .apply("unbounded to bounded", new SplittableParDo<>(makeParDo(unboundedFn)))
+            .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
     assertEquals(
         "Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply("unbounded to unbounded", new SplittableParDo<>(makeParDo(unboundedFn)))
+            .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index b20113e..9a26283 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -81,7 +81,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     DoFn<InputT, OutputT> fn = transform.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
     if (signature.processElement().isSplittable()) {
-      return new SplittableParDo(transform);
+      return (PTransform) SplittableParDo.forJavaParDo(transform);
     } else if (signature.stateDeclarations().size() > 0
         || signature.timerDeclarations().size() > 0) {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 27bb4ec..ebc9345 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new SplittableParDo<>(transform.getTransform()));
+          SplittableParDo.forJavaParDo(transform.getTransform()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
index 9322878..fc010f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -64,7 +64,7 @@ class SplittableParDoOverrides {
             appliedTransform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(appliedTransform),
-          new SplittableParDo<>(appliedTransform.getTransform()));
+          SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
     }
 
     @Override