You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/07 20:04:04 UTC
[beam] branch master updated: Support NULL query parameters in
ZetaSQL and fix nullable ARRAY bug
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 99e8c27 Support NULL query parameters in ZetaSQL and fix nullable ARRAY bug
new 4c36792 Merge pull request #12483 from robinyqiu/param
99e8c27 is described below
commit 99e8c2732b31c9531b40a3669717080bbf0914b3
Author: Robin Qiu <ro...@google.com>
AuthorDate: Thu Aug 6 11:06:32 2020 -0700
Support NULL query parameters in ZetaSQL and fix nullable ARRAY bug
---
.../provider/bigquery/BeamSqlUnparseContext.java | 31 ++++++++++++++++++++++
.../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 29 ++++++++++++--------
.../zetasql/ZetaSqlCalciteTranslationUtils.java | 9 ++++---
.../zetasql/translation/ExpressionConverter.java | 13 ++++++++-
.../LimitOffsetScanToLimitConverter.java | 7 ++++-
.../sql/zetasql/ZetaSqlDialectSpecTest.java | 6 +----
6 files changed, 74 insertions(+), 21 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
index ca33864..ac65eb5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor.POS;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.function.IntFunction;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.text.translate.CharSequenceTranslator;
@@ -28,9 +30,12 @@ import org.apache.beam.repackaged.core.org.apache.commons.lang3.text.translate.L
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexDynamicParam;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlDynamicParam;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlLiteral;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
@@ -63,10 +68,16 @@ public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
// Unicode (only 4 hex digits)
.with(JavaUnicodeEscaper.outsideOf(32, 0x7f));
+ private Map<String, RelDataType> nullParams = new HashMap<>();
+
public BeamSqlUnparseContext(IntFunction<SqlNode> field) {
super(BeamBigQuerySqlDialect.DEFAULT, field);
}
+ public Map<String, RelDataType> getNullParams() {
+ return nullParams;
+ }
+
@Override
public SqlNode toSql(RexProgram program, RexNode rex) {
if (rex.getKind().equals(SqlKind.LITERAL)) {
@@ -93,6 +104,12 @@ public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
return new ReplaceLiteral(literal, POS, "ISOWEEK");
}
}
+ } else if (rex.getKind().equals(SqlKind.DYNAMIC_PARAM)) {
+ final RexDynamicParam param = (RexDynamicParam) rex;
+ final int index = param.getIndex();
+ final String name = "null_param_" + index;
+ nullParams.put(name, param.getType());
+ return new NamedDynamicParam(index, POS, name);
}
return super.toSql(program, rex);
@@ -188,4 +205,18 @@ public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
return super.hashCode();
}
}
+
+ private static class NamedDynamicParam extends SqlDynamicParam {
+ private final String newName;
+
+ NamedDynamicParam(int index, SqlParserPos pos, String newName) {
+ super(index, pos);
+ this.newName = newName;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.literal("@" + newName);
+ }
+ }
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 70153bd..50c5336 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.extensions.sql.zetasql;
import com.google.zetasql.AnalyzerOptions;
import com.google.zetasql.PreparedExpression;
import com.google.zetasql.Value;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.IntFunction;
@@ -42,7 +41,7 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptClus
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
@@ -61,7 +60,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
private static final SqlDialect DIALECT = BeamBigQuerySqlDialect.DEFAULT;
- private final SqlImplementor.Context context;
+ private final BeamSqlUnparseContext context;
private static String columnName(int i) {
return "_" + i;
@@ -110,6 +109,7 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
CalcFn calcFn =
new CalcFn(
context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql(),
+ createNullParams(context.getNullParams()),
upstream.getSchema(),
outputSchema,
options.getZetaSqlDefaultTimezone(),
@@ -122,12 +122,23 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
}
}
+ private static Map<String, Value> createNullParams(Map<String, RelDataType> input) {
+ Map<String, Value> result = new HashMap<>();
+ for (Map.Entry<String, RelDataType> entry : input.entrySet()) {
+ result.put(
+ entry.getKey(),
+ Value.createNullValue(ZetaSqlCalciteTranslationUtils.toZetaType(entry.getValue())));
+ }
+ return result;
+ }
+
/**
* {@code CalcFn} is the executor for a {@link BeamZetaSqlCalcRel} step. The implementation is
* based on the {@code ZetaSQL} expression evaluator.
*/
private static class CalcFn extends DoFn<Row, Row> {
private final String sql;
+ private final Map<String, Value> nullParams;
private final Schema inputSchema;
private final Schema outputSchema;
private final String defaultTimezone;
@@ -136,11 +147,13 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
CalcFn(
String sql,
+ Map<String, Value> nullParams,
Schema inputSchema,
Schema outputSchema,
String defaultTimezone,
boolean verifyRowValues) {
this.sql = sql;
+ this.nullParams = nullParams;
this.inputSchema = inputSchema;
this.outputSchema = outputSchema;
this.defaultTimezone = defaultTimezone;
@@ -149,10 +162,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
@Setup
public void setup() {
- // TODO[BEAM-9182]: support parameters in expression evaluation
- // Query parameters are not set because they have already been substituted.
AnalyzerOptions options =
- SqlAnalyzer.getAnalyzerOptions(QueryParameters.ofNone(), defaultTimezone);
+ SqlAnalyzer.getAnalyzerOptions(QueryParameters.ofNamed(nullParams), defaultTimezone);
for (int i = 0; i < inputSchema.getFieldCount(); i++) {
options.addExpressionColumn(
columnName(i),
@@ -175,11 +186,7 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
}
- // TODO[BEAM-9182]: support parameters in expression evaluation
- // The map is empty because parameters in the query string have already been substituted.
- Map<String, Value> params = Collections.emptyMap();
-
- Value v = exp.execute(columns, params);
+ Value v = exp.execute(columns, nullParams);
if (!v.isNull()) {
Row outputRow =
ZetaSqlBeamTranslationUtils.zetaSqlStructValueToBeamRow(
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 81bc142..df7f4e2 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -136,9 +136,12 @@ public final class ZetaSqlCalciteTranslationUtils {
// -1 cardinality means unlimited array size.
// TODO: is unlimited array size right for general case?
// TODO: whether isNullable should be ArrayType's nullablity (not its element type's?)
- return rexBuilder
- .getTypeFactory()
- .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1);
+ return nullable(
+ rexBuilder,
+ rexBuilder
+ .getTypeFactory()
+ .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1),
+ isNullable);
}
private static List<String> toNameList(List<StructField> fields) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index fd5651f..d0b4112 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -193,6 +193,7 @@ public class ExpressionConverter {
private final RelOptCluster cluster;
private final QueryParameters queryParams;
+ private int nullParamCount = 0;
private final Map<String, ResolvedCreateFunctionStmt> userDefinedFunctions;
public ExpressionConverter(
@@ -1217,7 +1218,17 @@ public class ExpressionConverter {
throw new IllegalArgumentException("Found unexpected parameter " + parameter);
}
Preconditions.checkState(parameter.getType().equals(value.getType()));
- return convertValueToRexNode(value.getType(), value);
+ if (value.isNull()) {
+ // In some cases NULL parameter cannot be substituted with NULL literal
+ // Therefore we create a dynamic parameter placeholder here for each NULL parameter
+ return rexBuilder()
+ .makeDynamicParam(
+ ZetaSqlCalciteTranslationUtils.toRelDataType(rexBuilder(), value.getType(), true),
+ nullParamCount++);
+ } else {
+ // Substitute non-NULL parameter with literal
+ return convertValueToRexNode(value.getType(), value);
+ }
}
private RexNode convertResolvedArgumentRef(
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java
index 3052d6f..b58c40a 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java
@@ -26,6 +26,7 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollatio
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollations;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexDynamicParam;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -64,7 +65,11 @@ class LimitOffsetScanToLimitConverter extends RelConverter<ResolvedLimitOffsetSc
input.getRowType().getFieldList(),
ImmutableMap.of());
- if (RexLiteral.isNullLiteral(offset) || RexLiteral.isNullLiteral(fetch)) {
+ // offset or fetch being RexDynamicParam means it is NULL (the only param supported currently)
+ if (offset instanceof RexDynamicParam
+ || RexLiteral.isNullLiteral(offset)
+ || fetch instanceof RexDynamicParam
+ || RexLiteral.isNullLiteral(fetch)) {
throw new UnsupportedOperationException("Limit requires non-null count and offset");
}
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index d148012..9a67da6 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -252,7 +252,6 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
}
@Test
- @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
public void testEQ1() {
String sql = "SELECT @p0 = @p1 AS ColA";
@@ -294,7 +293,6 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
}
@Test
- @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
public void testEQ3() {
String sql = "SELECT @p0 = @p1 AS ColA";
@@ -523,7 +521,6 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
}
@Test
- @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
public void testNullIfCoercion() {
String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
ImmutableMap<String, Value> params =
@@ -733,9 +730,8 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
}
@Test
- @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
public void testLikeNullPattern() {
- String sql = "SELECT @p0 LIKE @p1 AS ColA";
+ String sql = "SELECT @p0 LIKE @p1 AS ColA";
ImmutableMap<String, Value> params =
ImmutableMap.of(
"p0",