You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:19 UTC

[11/14] beam git commit: Support side inputs in CombineTranslation

Support side inputs in CombineTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bc77fcf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bc77fcf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bc77fcf

Branch: refs/heads/master
Commit: 5bc77fcf619dc6f1272d1cd4143b6a09e0cfbda1
Parents: 11368e0
Author: Kenneth Knowles <ke...@apache.org>
Authored: Tue Oct 17 11:50:46 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/CombineTranslation.java     | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5bc77fcf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 21796aa..ff431fc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -23,7 +23,6 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.CO
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
@@ -49,6 +48,7 @@ import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * Methods for translating between {@link Combine.PerKey} {@link PTransform PTransforms} and {@link
@@ -170,8 +170,12 @@ public class CombineTranslation {
 
           @Override
           public Map<String, SideInput> getSideInputs() {
-            // TODO: support side inputs
-            return ImmutableMap.of();
+            Map<String, SideInput> sideInputs = new HashMap<>();
+            for (PCollectionView<?> sideInput : combine.getTransform().getSideInputs()) {
+              sideInputs.put(
+                  sideInput.getTagInternal().getId(), ParDoTranslation.toProto(sideInput));
+            }
+            return sideInputs;
           }
         },
         components);