You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/11 03:39:24 UTC

[5/8] beam git commit: Port DirectRunner ParDo overrides to SDK-agnostic APIs

Port DirectRunner ParDo overrides to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ac4b7e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ac4b7e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ac4b7e6

Branch: refs/heads/master
Commit: 1ac4b7e6f7dbbd68c27c6634cd52767885a42760
Parents: fa61ed1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 13:44:52 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 20:17:56 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     | 16 ++++++---
 .../construction/RunnerPCollectionView.java     | 16 +++++++++
 .../direct/ParDoMultiOverrideFactory.java       | 35 +++++++++-----------
 3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fe8c5aa..90c9aad 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
 
@@ -262,12 +263,19 @@ public class ParDoTranslation {
     ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
 
     List<PCollectionView<?>> views = new ArrayList<>();
-    for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) {
+    for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
+      String sideInputTag = sideInputEntry.getKey();
+      RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+      PCollection<?> originalPCollection =
+          checkNotNull(
+              (PCollection<?>) application.getInputs().get(new TupleTag<>(sideInputTag)),
+              "no input with tag %s",
+              sideInputTag);
       views.add(
           viewFromProto(
-              application.getPipeline(),
-              sideInput.getValue(),
-              sideInput.getKey(),
+              sideInput,
+              sideInputTag,
+              originalPCollection,
               parDoProto,
               sdkComponents.toComponents()));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index b275188..85139e8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import java.util.Map;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
@@ -94,4 +95,19 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
     throw new UnsupportedOperationException(String.format(
         "A %s cannot be expanded", RunnerPCollectionView.class.getSimpleName()));
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PCollectionView)) {
+      return false;
+    }
+    @SuppressWarnings("unchecked")
+    PCollectionView<?> otherView = (PCollectionView<?>) other;
+    return tag.equals(otherView.getTagInternal());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tag);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8881967..891d102 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -73,9 +72,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
                   PCollection<? extends InputT>, PCollectionTuple,
                   PTransform<PCollection<? extends InputT>, PCollectionTuple>>
               application) {
-    return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(application),
-        getReplacementForApplication(application));
+
+    try {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(application),
+          getReplacementForApplication(application));
+    } catch (IOException exc) {
+      throw new RuntimeException(exc);
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -83,31 +87,22 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
       AppliedPTransform<
               PCollection<? extends InputT>, PCollectionTuple,
               PTransform<PCollection<? extends InputT>, PCollectionTuple>>
-          application) {
+          application)
+      throws IOException {
 
-    DoFn<InputT, OutputT> fn;
-    try {
-      fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
-    } catch (IOException exc) {
-      throw new RuntimeException(exc);
-    }
+    DoFn<InputT, OutputT> fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
 
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
     if (signature.processElement().isSplittable()) {
       return (PTransform) SplittableParDo.forAppliedParDo(application);
     } else if (signature.stateDeclarations().size() > 0
         || signature.timerDeclarations().size() > 0) {
-
-      MultiOutput<InputT, OutputT> transform =
-          (MultiOutput<InputT, OutputT>) application.getTransform();
-
-      // Based on the fact that the signature is stateful, DoFnSignatures ensures
-      // that it is also keyed
       return new GbkThenStatefulParDo(
           fn,
-          transform.getMainOutputTag(),
-          transform.getAdditionalOutputTags(),
-          transform.getSideInputs());
+          ParDoTranslation.getMainOutputTag(application),
+          ParDoTranslation.getAdditionalOutputTags(application),
+          ParDoTranslation.getSideInputs(application));
     } else {
       return application.getTransform();
     }