You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/14 18:35:17 UTC
[beam] branch master updated: Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 67b7962f8e7 Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465)
67b7962f8e7 is described below
commit 67b7962f8e7c93e7a3e7fefc5911de45c2693644
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Tue Feb 14 10:35:09 2023 -0800
Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465)
---
.../sql/expansion/SqlTransformSchemaTransformProvider.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
index 0649a0978e4..7502d0881bb 100644
--- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
@@ -37,11 +37,13 @@ import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -126,6 +128,13 @@ public class SqlTransformSchemaTransformProvider implements SchemaTransformProvi
@Override
public PDone expand(PCollection<Row> input) {
+ input.apply(
+ "noop_" + inputs.size(),
+ MapElements.into(TypeDescriptors.nulls())
+ .via(
+ err -> {
+ return null;
+ }));
inputs.add(input);
return PDone.in(input.getPipeline());
}