You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/13 22:28:10 UTC
[beam] branch master updated: Adding support for DLQ for ZetaSQL (#25426)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 299be58cf99 Adding support for DLQ for ZetaSQL (#25426)
299be58cf99 is described below
commit 299be58cf997d9e1561409034692ea4f2d4ff357
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon Feb 13 14:28:01 2023 -0800
Adding support for DLQ for ZetaSQL (#25426)
* Adding support for DLQ for ZetaSQL
* fixed issue for not-all-fields are selected
* fixup
* fix spotless
* fix test
---
.../apache/beam/sdk/coders/RowCoderGenerator.java | 12 ++-
.../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 112 ++++++++++++++++-----
.../sql/zetasql/BeamZetaSqlCalcRelTest.java | 70 +++++++++++++
.../sql/zetasql/ZetaSqlDialectSpecTest.java | 14 ++-
4 files changed, 177 insertions(+), 31 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 70ee6c1ad81..5ce7358d882 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -142,7 +142,10 @@ public abstract class RowCoderGenerator {
}
// There should never be duplicate encoding positions.
Preconditions.checkState(
- schema.getFieldCount() == Arrays.stream(encodingPosToRowIndex).distinct().count());
+ schema.getFieldCount() == Arrays.stream(encodingPosToRowIndex).distinct().count(),
+ "The input schema (%s) and map for position encoding (%s) do not match.",
+ schema.getFields(),
+ encodingPosToRowIndex);
// Component coders are ordered by encoding position, but may encode a field with a different
// row index.
@@ -311,7 +314,12 @@ public abstract class RowCoderGenerator {
boolean hasNullableFields)
throws IOException {
checkState(value.getFieldCount() == value.getSchema().getFieldCount());
- checkState(encodingPosToIndex.length == value.getFieldCount());
+ checkState(
+ encodingPosToIndex.length == value.getFieldCount(),
+ "Unable to encode row. Expected %s values, but row has %s%s",
+ encodingPosToIndex.length,
+ value.getFieldCount(),
+ value.getSchema().getFieldNames());
// Encode the field count. This allows us to handle compatible schema changes.
VAR_INT_CODER.encode(value.getFieldCount(), outputStream);
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 744fbd0bcd4..be1c4613ad0 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -37,18 +37,24 @@ import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode;
@@ -64,7 +70,6 @@ import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdO
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -83,6 +88,9 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
private static final int MAX_PENDING_WINDOW = 32;
private final BeamSqlUnparseContext context;
+ private static final TupleTag<Row> rows = new TupleTag<Row>("output") {};
+ private static final TupleTag<Row> errors = new TupleTag<Row>("errors") {};
+
private static String columnName(int i) {
return "_" + i;
}
@@ -101,21 +109,36 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
@Override
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
- return new Transform();
+ return buildPTransform(null);
+ }
+
+ @Override
+ public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform(
+ @Nullable PTransform<PCollection<Row>, ? extends POutput> errorsTransformer) {
+ return new Transform(errorsTransformer);
}
@AutoValue
abstract static class TimestampedFuture {
- private static TimestampedFuture create(Instant t, Future<Value> f) {
- return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f);
+ private static TimestampedFuture create(Instant t, Future<Value> f, Row r) {
+ return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f, r);
}
abstract Instant timestamp();
abstract Future<Value> future();
+
+ abstract Row row();
}
private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+ private final @Nullable PTransform<PCollection<Row>, ? extends POutput> errorsTransformer;
+
+ Transform(@Nullable PTransform<PCollection<Row>, ? extends POutput> errorsTransformer) {
+ this.errorsTransformer = errorsTransformer;
+ }
+
@Override
public PCollection<Row> expand(PCollectionList<Row> pinput) {
Preconditions.checkArgument(
@@ -135,9 +158,10 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
SqlStdOperatorTable.CASE, condition, rex, rexBuilder.makeNullLiteral(getRowType()));
}
+ final Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
BeamSqlPipelineOptions options =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class);
- Schema outputSchema = CalciteUtils.toSchema(getRowType());
CalcFn calcFn =
new CalcFn(
context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql(),
@@ -147,7 +171,15 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
options.getZetaSqlDefaultTimezone(),
options.getVerifyRowValues());
- return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema);
+ PCollectionTuple tuple =
+ upstream.apply(ParDo.of(calcFn).withOutputTags(rows, TupleTagList.of(errors)));
+ tuple.get(errors).setRowSchema(calcFn.errorsSchema);
+
+ if (errorsTransformer != null) {
+ tuple.get(errors).apply(errorsTransformer);
+ }
+
+ return tuple.get(rows).setRowSchema(outputSchema);
}
}
@@ -173,6 +205,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
private final Schema outputSchema;
private final String defaultTimezone;
private final boolean verifyRowValues;
+
+ final Schema errorsSchema;
private final List<Integer> referencedColumns;
@FieldAccess("row")
@@ -205,6 +239,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
}
this.referencedColumns = columns.build();
this.fieldAccess = FieldAccessDescriptor.withFieldIds(this.referencedColumns);
+ Schema inputRowSchema = SelectHelpers.getOutputSchema(inputSchema, fieldAccess);
+ this.errorsSchema = BeamSqlRelUtils.getErrorRowSchema(inputRowSchema);
}
}
@@ -242,30 +278,39 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
@ProcessElement
public void processElement(
- @FieldAccess("row") Row row, @Timestamp Instant t, BoundedWindow w, OutputReceiver<Row> r)
+ @FieldAccess("row") Row row,
+ @Timestamp Instant t,
+ BoundedWindow w,
+ OutputReceiver<Row> r,
+ MultiOutputReceiver multiOutputReceiver)
throws InterruptedException {
- Map<String, Value> columns = new HashMap<>();
- for (int i : referencedColumns) {
- final Field field = inputSchema.getField(i);
- columns.put(
- columnName(i),
- ZetaSqlBeamTranslationUtils.toZetaSqlValue(
- row.getBaseValue(field.getName(), Object.class), field.getType()));
- }
-
- @NonNull
- Future<Value> valueFuture = checkArgumentNotNull(stream).execute(columns, nullParams);
@Nullable Queue<TimestampedFuture> pendingWindow = pending.get(w);
if (pendingWindow == null) {
pendingWindow = new ArrayDeque<>();
pending.put(w, pendingWindow);
}
- pendingWindow.add(TimestampedFuture.create(t, valueFuture));
+ try {
+ Map<String, Value> columns = new HashMap<>();
+ for (int i : referencedColumns) {
+ final Field field = inputSchema.getField(i);
+ columns.put(
+ columnName(i),
+ ZetaSqlBeamTranslationUtils.toZetaSqlValue(
+ row.getBaseValue(field.getName(), Object.class), field.getType()));
+ }
+ Future<Value> valueFuture = checkArgumentNotNull(stream).execute(columns, nullParams);
+ pendingWindow.add(TimestampedFuture.create(t, valueFuture, row));
+
+ } catch (UnsupportedOperationException | ArithmeticException | IllegalArgumentException e) {
+ multiOutputReceiver
+ .get(errors)
+ .output(Row.withSchema(errorsSchema).addValues(row, e.toString()).build());
+ }
while ((!pendingWindow.isEmpty() && pendingWindow.element().future().isDone())
|| pendingWindow.size() > MAX_PENDING_WINDOW) {
- outputRow(pendingWindow.remove(), r);
+ outputRow(pendingWindow.remove(), r, multiOutputReceiver.get(errors));
}
}
@@ -274,9 +319,12 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
checkArgumentNotNull(stream).flush();
for (Map.Entry<BoundedWindow, Queue<TimestampedFuture>> pendingWindow : pending.entrySet()) {
OutputReceiver<Row> rowOutputReciever =
- new OutputReceiverForFinishBundle(c, pendingWindow.getKey());
+ new OutputReceiverForFinishBundle(c, pendingWindow.getKey(), rows);
+ OutputReceiver<Row> errorOutputReciever =
+ new OutputReceiverForFinishBundle(c, pendingWindow.getKey(), errors);
+
for (TimestampedFuture timestampedFuture : pendingWindow.getValue()) {
- outputRow(timestampedFuture, rowOutputReciever);
+ outputRow(timestampedFuture, rowOutputReciever, errorOutputReciever);
}
}
}
@@ -288,9 +336,13 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
private final FinishBundleContext c;
private final BoundedWindow w;
- private OutputReceiverForFinishBundle(FinishBundleContext c, BoundedWindow w) {
+ private final TupleTag<Row> tag;
+
+ private OutputReceiverForFinishBundle(
+ FinishBundleContext c, BoundedWindow w, TupleTag<Row> tag) {
this.c = c;
this.w = w;
+ this.tag = tag;
}
@Override
@@ -300,11 +352,11 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
@Override
public void outputWithTimestamp(Row output, Instant timestamp) {
- c.output(output, timestamp, w);
+ c.output(tag, output, timestamp, w);
}
}
- private static RuntimeException extractException(ExecutionException e) {
+ private static RuntimeException extractException(Throwable e) {
try {
throw checkArgumentNotNull(e.getCause());
} catch (RuntimeException r) {
@@ -314,12 +366,18 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
}
}
- private void outputRow(TimestampedFuture c, OutputReceiver<Row> r) throws InterruptedException {
+ private void outputRow(
+ TimestampedFuture c, OutputReceiver<Row> r, OutputReceiver<Row> errorOutputReceiver)
+ throws InterruptedException {
final Value v;
try {
v = c.future().get();
} catch (ExecutionException e) {
- throw extractException(e);
+ errorOutputReceiver.outputWithTimestamp(
+ Row.withSchema(errorsSchema).addValues(c.row(), e.toString()).build(), c.timestamp());
+ return;
+ } catch (Throwable thr) {
+ throw extractException(thr);
}
if (!v.isNull()) {
Row row = ZetaSqlBeamTranslationUtils.toBeamRow(v, outputSchema, verifyRowValues);
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRelTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRelTest.java
index b490458df33..829a3b6e6c9 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRelTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRelTest.java
@@ -17,19 +17,25 @@
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -93,6 +99,70 @@ public class BeamZetaSqlCalcRelTest extends ZetaSqlTestBase {
pipeline.run().waitUntilFinish();
}
+ @Test
+ public void testErrorsInCalculation() throws IllegalAccessException {
+ String sql = "SELECT ts, Key*7777*7777*77777*7777777*7777777777 as num, Value FROM KeyValue";
+
+ PCollection<Row> rows = compile(sql);
+
+ final NodeGetter nodeGetter = new NodeGetter(rows);
+ pipeline.traverseTopologically(nodeGetter);
+
+ ParDo.MultiOutput<Row, Row> pardo =
+ (ParDo.MultiOutput<Row, Row>) nodeGetter.producer.getTransform();
+
+ PCollection<Row> errors =
+ (PCollection<Row>)
+ nodeGetter.producer.getOutputs().get(pardo.getAdditionalOutputTags().get(0));
+ Assert.assertEquals(2, errors.getSchema().getFieldCount());
+
+ PAssert.that(errors.apply(Count.globally())).containsInAnyOrder(2L);
+ PAssert.that(errors)
+ .satisfies(
+ (SerializableFunction<Iterable<Row>, Void>)
+ input -> {
+ Assert.assertEquals(
+ Lists.newArrayList(input).stream()
+ .map(r -> r.getRow("row").getInt64("Key"))
+ .collect(Collectors.toSet()),
+ Sets.newHashSet(14L, 15L));
+ return null;
+ });
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testErrorsInCalculationWithSelectedCols() throws IllegalAccessException {
+ String sql = "SELECT ts, Key*7777*7777*77777*7777777*7777777777 as num FROM KeyValue";
+
+ PCollection<Row> rows = compile(sql);
+
+ final NodeGetter nodeGetter = new NodeGetter(rows);
+ pipeline.traverseTopologically(nodeGetter);
+
+ ParDo.MultiOutput<Row, Row> pardo =
+ (ParDo.MultiOutput<Row, Row>) nodeGetter.producer.getTransform();
+
+ PCollection<Row> errors =
+ (PCollection<Row>)
+ nodeGetter.producer.getOutputs().get(pardo.getAdditionalOutputTags().get(0));
+ Assert.assertEquals(2, errors.getSchema().getFieldCount());
+
+ PAssert.that(errors.apply(Count.globally())).containsInAnyOrder(2L);
+ PAssert.that(errors)
+ .satisfies(
+ (SerializableFunction<Iterable<Row>, Void>)
+ input -> {
+ Assert.assertEquals(
+ Lists.newArrayList(input).stream()
+ .map(r -> r.getRow("row").getInt64("Key"))
+ .collect(Collectors.toSet()),
+ Sets.newHashSet(14L, 15L));
+ return null;
+ });
+ pipeline.run().waitUntilFinish();
+ }
+
@Test
public void testNoFieldAccess() throws IllegalAccessException {
String sql = "SELECT 1 FROM KeyValue";
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index ec5ea89f97a..18bbae57a38 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -46,7 +46,10 @@ import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -3606,7 +3609,7 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
}
@Test
- public void testSubstrWithLargeValueExpectException() {
+ public void testSubstrWithLargeValueExpectErrorsAreOutput() {
String sql = "SELECT substr(@p0, @p1, @p2)";
ImmutableMap<String, Value> params =
ImmutableMap.of(
@@ -3616,8 +3619,15 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
+ beamRelNode.withErrorsTransformer(
+ new PTransform<PCollection<Row>, POutput>() {
+ @Override
+ public POutput expand(PCollection<Row> input) {
+ PAssert.that(input.apply(Count.globally())).containsInAnyOrder(1L);
+ return null;
+ }
+ });
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
- thrown.expect(RuntimeException.class);
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}