You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:42 UTC

[30/50] [abbrv] beam git commit: Add more utilities to ParDoTranslation

Add more utilities to ParDoTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/860e0a08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/860e0a08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/860e0a08

Branch: refs/heads/DSL_SQL
Commit: 860e0a08ecd84533220f6ef8e18d1409964d69cd
Parents: 1f17b8a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 13:46:18 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     | 48 ++++++++++++++++++++
 1 file changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/860e0a08/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 34e0d86..5f2bcae 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -34,9 +34,11 @@ import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
@@ -215,11 +218,56 @@ public class ParDoTranslation {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
   }
 
+  public static DoFn<?, ?> getDoFn(AppliedPTransform<?, ?, ?> application) throws IOException {
+    return getDoFn(getParDoPayload(application));
+  }
+
   public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
       throws InvalidProtocolBufferException {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
   }
 
+  public static TupleTag<?> getMainOutputTag(AppliedPTransform<?, ?, ?> application)
+      throws IOException {
+    return getMainOutputTag(getParDoPayload(application));
+  }
+
+  public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> application)
+      throws IOException {
+
+    RunnerApi.PTransform protoTransform =
+        PTransformTranslation.toProto(application, SdkComponents.create());
+
+    ParDoPayload payload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    TupleTag<?> mainOutputTag = getMainOutputTag(payload);
+    Set<String> outputTags =
+        Sets.difference(
+            protoTransform.getOutputsMap().keySet(), Collections.singleton(mainOutputTag.getId()));
+
+    ArrayList<TupleTag<?>> additionalOutputTags = new ArrayList<>();
+    for (String outputTag : outputTags) {
+      additionalOutputTags.add(new TupleTag<>(outputTag));
+    }
+    return TupleTagList.of(additionalOutputTags);
+  }
+
+  public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, ?> application)
+      throws IOException {
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PTransform parDoProto =
+        PTransformTranslation.toProto(application, sdkComponents);
+    ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
+
+    List<PCollectionView<?>> views = new ArrayList<>();
+    for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) {
+      views.add(
+          fromProto(
+              sideInput.getValue(), sideInput.getKey(), parDoProto, sdkComponents.toComponents()));
+    }
+    return views;
+  }
+
   public static RunnerApi.PCollection getMainInput(
       RunnerApi.PTransform ptransform, Components components) throws IOException {
     checkArgument(