You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/22 19:35:03 UTC

[2/3] beam git commit: Extract the Main Input PCollection in ParDos

Extract the Main Input PCollection in ParDos


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/888a5e6a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/888a5e6a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/888a5e6a

Branch: refs/heads/master
Commit: 888a5e6ab9c1c83ee06281de4c906be69c076286
Parents: 10357c2
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 19 14:24:07 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 22 12:24:45 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/ParDos.java      | 16 ++++++++++++++++
 .../beam/runners/core/construction/ParDosTest.java  |  4 ++++
 2 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/888a5e6a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
index 4752bd1..2ecc041 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
@@ -161,6 +163,20 @@ public class ParDos {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
   }
 
+  public static RunnerApi.PCollection getMainInput(
+      RunnerApi.PTransform ptransform, Components components) throws IOException {
+    checkArgument(
+        ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN),
+        "Unexpected payload type %s",
+        ptransform.getSpec().getUrn());
+    ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    String mainInputId =
+        Iterables.getOnlyElement(
+            Sets.difference(
+                ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet()));
+    return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
+  }
+
   // TODO: Implement
   private static StateSpec toProto(StateDeclaration state) {
     throw new UnsupportedOperationException("Not yet supported");

http://git-wip-us.apache.org/repos/asf/beam/blob/888a5e6a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
index 74edec1..b6f0b7d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
@@ -149,6 +149,10 @@ public class ParDosTest {
               view.getWindowingStrategyInternal().fixDefaults()));
       assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
     }
+    String mainInputId = components.registerPCollection(mainInput);
+    assertThat(
+        ParDos.getMainInput(protoTransform, protoComponents),
+        equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
   }
 
   private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {