You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/11 03:39:23 UTC

[4/8] beam git commit: Enable SplittableParDo on rehydrated ParDo transform

Enable SplittableParDo on rehydrated ParDo transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa61ed17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa61ed17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa61ed17

Branch: refs/heads/master
Commit: fa61ed17424083fa53b8aa8e70908fb6194ad4ad
Parents: de39f32
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 14:27:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 20:15:49 2017 -0700

----------------------------------------------------------------------
 .../core/construction/SplittableParDo.java      | 25 ++++++++++++++
 .../direct/ParDoMultiOverrideFactory.java       | 36 ++++++++++++++------
 .../flink/FlinkStreamingPipelineTranslator.java |  2 +-
 3 files changed, 52 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fa61ed17/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index f31b495..e71187b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTrans
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -103,6 +105,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
       ParDo.MultiOutput<InputT, OutputT> parDo) {
     checkArgument(parDo != null, "parDo must not be null");
+    checkArgument(
+        DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
+        "fn must be a splittable DoFn");
     return new SplittableParDo(
         parDo.getFn(),
         parDo.getMainOutputTag(),
@@ -110,6 +115,26 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         parDo.getAdditionalOutputTags());
   }
 
+  /**
+   * Creates the transform for a {@link ParDo}-compatible {@link AppliedPTransform}.
+   *
+   * <p>The input may generally be a deserialized transform so it may not actually be a {@link
+   * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
+   */
+  public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) {
+    checkArgument(parDo != null, "parDo must not be null");
+
+    try {
+      return new SplittableParDo<>(
+          ParDoTranslation.getDoFn(parDo),
+          (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
+          ParDoTranslation.getSideInputs(parDo),
+          ParDoTranslation.getAdditionalOutputTags(parDo));
+    } catch (IOException exc) {
+      throw new RuntimeException(exc);
+    }
+  }
+
   @Override
   public PCollectionTuple expand(PCollection<InputT> input) {
     Coder<RestrictionT> restrictionCoder =

http://git-wip-us.apache.org/repos/asf/beam/blob/fa61ed17/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 2904bc1..8881967 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.sdk.coders.Coder;
@@ -62,29 +64,43 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  */
 class ParDoMultiOverrideFactory<InputT, OutputT>
     implements PTransformOverrideFactory<
-        PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
+        PCollection<? extends InputT>, PCollectionTuple,
+        PTransform<PCollection<? extends InputT>, PCollectionTuple>> {
   @Override
   public PTransformReplacement<PCollection<? extends InputT>, PCollectionTuple>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
-              transform) {
+                  PCollection<? extends InputT>, PCollectionTuple,
+                  PTransform<PCollection<? extends InputT>, PCollectionTuple>>
+              application) {
     return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
-        getReplacementTransform(transform.getTransform()));
+        PTransformReplacements.getSingletonMainInput(application),
+        getReplacementForApplication(application));
   }
 
   @SuppressWarnings("unchecked")
-  private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(
-      MultiOutput<InputT, OutputT> transform) {
+  private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementForApplication(
+      AppliedPTransform<
+              PCollection<? extends InputT>, PCollectionTuple,
+              PTransform<PCollection<? extends InputT>, PCollectionTuple>>
+          application) {
+
+    DoFn<InputT, OutputT> fn;
+    try {
+      fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
+    } catch (IOException exc) {
+      throw new RuntimeException(exc);
+    }
 
-    DoFn<InputT, OutputT> fn = transform.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
     if (signature.processElement().isSplittable()) {
-      return (PTransform) SplittableParDo.forJavaParDo(transform);
+      return (PTransform) SplittableParDo.forAppliedParDo(application);
     } else if (signature.stateDeclarations().size() > 0
         || signature.timerDeclarations().size() > 0) {
 
+      MultiOutput<InputT, OutputT> transform =
+          (MultiOutput<InputT, OutputT>) application.getTransform();
+
       // Based on the fact that the signature is stateful, DoFnSignatures ensures
       // that it is also keyed
       return new GbkThenStatefulParDo(
@@ -93,7 +109,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
           transform.getAdditionalOutputTags(),
           transform.getSideInputs());
     } else {
-      return transform;
+      return application.getTransform();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fa61ed17/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index ebc9345..f733e2e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          SplittableParDo.forJavaParDo(transform.getTransform()));
+          (SplittableParDo<InputT, OutputT, ?>) SplittableParDo.forAppliedParDo(transform));
     }
 
     @Override