You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:42 UTC
[30/50] [abbrv] beam git commit: Add more utilities to
ParDoTranslation
Add more utilities to ParDoTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/860e0a08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/860e0a08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/860e0a08
Branch: refs/heads/DSL_SQL
Commit: 860e0a08ecd84533220f6ef8e18d1409964d69cd
Parents: 1f17b8a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 13:46:18 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 48 ++++++++++++++++++++
1 file changed, 48 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/860e0a08/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 34e0d86..5f2bcae 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -34,9 +34,11 @@ import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
/**
@@ -215,11 +218,56 @@ public class ParDoTranslation {
return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
}
+ public static DoFn<?, ?> getDoFn(AppliedPTransform<?, ?, ?> application) throws IOException {
+ return getDoFn(getParDoPayload(application));
+ }
+
public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
throws InvalidProtocolBufferException {
return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
}
+ public static TupleTag<?> getMainOutputTag(AppliedPTransform<?, ?, ?> application)
+ throws IOException {
+ return getMainOutputTag(getParDoPayload(application));
+ }
+
+ public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> application)
+ throws IOException {
+
+ RunnerApi.PTransform protoTransform =
+ PTransformTranslation.toProto(application, SdkComponents.create());
+
+ ParDoPayload payload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ TupleTag<?> mainOutputTag = getMainOutputTag(payload);
+ Set<String> outputTags =
+ Sets.difference(
+ protoTransform.getOutputsMap().keySet(), Collections.singleton(mainOutputTag.getId()));
+
+ ArrayList<TupleTag<?>> additionalOutputTags = new ArrayList<>();
+ for (String outputTag : outputTags) {
+ additionalOutputTags.add(new TupleTag<>(outputTag));
+ }
+ return TupleTagList.of(additionalOutputTags);
+ }
+
+ public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, ?> application)
+ throws IOException {
+
+ SdkComponents sdkComponents = SdkComponents.create();
+ RunnerApi.PTransform parDoProto =
+ PTransformTranslation.toProto(application, sdkComponents);
+ ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
+
+ List<PCollectionView<?>> views = new ArrayList<>();
+ for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) {
+ views.add(
+ fromProto(
+ sideInput.getValue(), sideInput.getKey(), parDoProto, sdkComponents.toComponents()));
+ }
+ return views;
+ }
+
public static RunnerApi.PCollection getMainInput(
RunnerApi.PTransform ptransform, Components components) throws IOException {
checkArgument(