You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/14 18:35:17 UTC

[beam] branch master updated: Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 67b7962f8e7 Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465)
67b7962f8e7 is described below

commit 67b7962f8e7c93e7a3e7fefc5911de45c2693644
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Tue Feb 14 10:35:09 2023 -0800

    Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465)
---
 .../sql/expansion/SqlTransformSchemaTransformProvider.java       | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
index 0649a0978e4..7502d0881bb 100644
--- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
@@ -37,11 +37,13 @@ import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
@@ -126,6 +128,13 @@ public class SqlTransformSchemaTransformProvider implements SchemaTransformProvi
 
     @Override
     public PDone expand(PCollection<Row> input) {
+      input.apply(
+          "noop_" + inputs.size(),
+          MapElements.into(TypeDescriptors.nulls())
+              .via(
+                  err -> {
+                    return null;
+                  }));
       inputs.add(input);
       return PDone.in(input.getPipeline());
     }