You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/23 02:46:21 UTC

[beam] branch master updated: Populate sideinput for SDF.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b3c46ac  Populate sideinput for SDF.
     new c0a7e66  Merge pull request #13173 from boyuanzz/sdf_sideinput
b3c46ac is described below

commit b3c46ac095f07a7ca2c236be92ec6ddfbfa5ff4a
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Oct 22 17:02:36 2020 -0700

    Populate sideinput for SDF.
---
 .../beam/runners/core/construction/graph/QueryablePipeline.java     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 6e44ad8..0803489 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -386,7 +386,11 @@ public class QueryablePipeline {
   }
 
   private Set<String> getLocalSideInputNames(PTransform transform) {
-    if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) {
+    if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())
+        || SPLITTABLE_PAIR_WITH_RESTRICTION_URN.equals(transform.getSpec().getUrn())
+        || SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN.equals(transform.getSpec().getUrn())
+        || SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.equals(
+            transform.getSpec().getUrn())) {
       try {
         return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getSideInputsMap().keySet();
       } catch (InvalidProtocolBufferException e) {