You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/07 20:04:04 UTC

[beam] branch master updated: Support NULL query parameters in ZetaSQL and fix nullable ARRAY bug

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 99e8c27  Support NULL query parameters in ZetaSQL and fix nullable ARRAY bug
     new 4c36792  Merge pull request #12483 from robinyqiu/param
99e8c27 is described below

commit 99e8c2732b31c9531b40a3669717080bbf0914b3
Author: Robin Qiu <ro...@google.com>
AuthorDate: Thu Aug 6 11:06:32 2020 -0700

    Support NULL query parameters in ZetaSQL and fix nullable ARRAY bug
---
 .../provider/bigquery/BeamSqlUnparseContext.java   | 31 ++++++++++++++++++++++
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 29 ++++++++++++--------
 .../zetasql/ZetaSqlCalciteTranslationUtils.java    |  9 ++++---
 .../zetasql/translation/ExpressionConverter.java   | 13 ++++++++-
 .../LimitOffsetScanToLimitConverter.java           |  7 ++++-
 .../sql/zetasql/ZetaSqlDialectSpecTest.java        |  6 +----
 6 files changed, 74 insertions(+), 21 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
index ca33864..ac65eb5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 
 import static org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor.POS;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.function.IntFunction;
 import org.apache.beam.repackaged.core.org.apache.commons.lang3.text.translate.CharSequenceTranslator;
@@ -28,9 +30,12 @@ import org.apache.beam.repackaged.core.org.apache.commons.lang3.text.translate.L
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexDynamicParam;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlDynamicParam;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlKind;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlLiteral;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
@@ -63,10 +68,16 @@ public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
           // Unicode (only 4 hex digits)
           .with(JavaUnicodeEscaper.outsideOf(32, 0x7f));
 
+  private Map<String, RelDataType> nullParams = new HashMap<>();
+
   public BeamSqlUnparseContext(IntFunction<SqlNode> field) {
     super(BeamBigQuerySqlDialect.DEFAULT, field);
   }
 
+  public Map<String, RelDataType> getNullParams() {
+    return nullParams;
+  }
+
   @Override
   public SqlNode toSql(RexProgram program, RexNode rex) {
     if (rex.getKind().equals(SqlKind.LITERAL)) {
@@ -93,6 +104,12 @@ public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
           return new ReplaceLiteral(literal, POS, "ISOWEEK");
         }
       }
+    } else if (rex.getKind().equals(SqlKind.DYNAMIC_PARAM)) {
+      final RexDynamicParam param = (RexDynamicParam) rex;
+      final int index = param.getIndex();
+      final String name = "null_param_" + index;
+      nullParams.put(name, param.getType());
+      return new NamedDynamicParam(index, POS, name);
     }
 
     return super.toSql(program, rex);
@@ -188,4 +205,18 @@ public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext {
       return super.hashCode();
     }
   }
+
+  private static class NamedDynamicParam extends SqlDynamicParam {
+    private final String newName;
+
+    NamedDynamicParam(int index, SqlParserPos pos, String newName) {
+      super(index, pos);
+      this.newName = newName;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+      writer.literal("@" + newName);
+    }
+  }
 }
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 70153bd..50c5336 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.extensions.sql.zetasql;
 import com.google.zetasql.AnalyzerOptions;
 import com.google.zetasql.PreparedExpression;
 import com.google.zetasql.Value;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.IntFunction;
@@ -42,7 +41,7 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptClus
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
@@ -61,7 +60,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
 public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
 
   private static final SqlDialect DIALECT = BeamBigQuerySqlDialect.DEFAULT;
-  private final SqlImplementor.Context context;
+  private final BeamSqlUnparseContext context;
 
   private static String columnName(int i) {
     return "_" + i;
@@ -110,6 +109,7 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
       CalcFn calcFn =
           new CalcFn(
               context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql(),
+              createNullParams(context.getNullParams()),
               upstream.getSchema(),
               outputSchema,
               options.getZetaSqlDefaultTimezone(),
@@ -122,12 +122,23 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
     }
   }
 
+  private static Map<String, Value> createNullParams(Map<String, RelDataType> input) {
+    Map<String, Value> result = new HashMap<>();
+    for (Map.Entry<String, RelDataType> entry : input.entrySet()) {
+      result.put(
+          entry.getKey(),
+          Value.createNullValue(ZetaSqlCalciteTranslationUtils.toZetaType(entry.getValue())));
+    }
+    return result;
+  }
+
   /**
    * {@code CalcFn} is the executor for a {@link BeamZetaSqlCalcRel} step. The implementation is
    * based on the {@code ZetaSQL} expression evaluator.
    */
   private static class CalcFn extends DoFn<Row, Row> {
     private final String sql;
+    private final Map<String, Value> nullParams;
     private final Schema inputSchema;
     private final Schema outputSchema;
     private final String defaultTimezone;
@@ -136,11 +147,13 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
 
     CalcFn(
         String sql,
+        Map<String, Value> nullParams,
         Schema inputSchema,
         Schema outputSchema,
         String defaultTimezone,
         boolean verifyRowValues) {
       this.sql = sql;
+      this.nullParams = nullParams;
       this.inputSchema = inputSchema;
       this.outputSchema = outputSchema;
       this.defaultTimezone = defaultTimezone;
@@ -149,10 +162,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
 
     @Setup
     public void setup() {
-      // TODO[BEAM-9182]: support parameters in expression evaluation
-      // Query parameters are not set because they have already been substituted.
       AnalyzerOptions options =
-          SqlAnalyzer.getAnalyzerOptions(QueryParameters.ofNone(), defaultTimezone);
+          SqlAnalyzer.getAnalyzerOptions(QueryParameters.ofNamed(nullParams), defaultTimezone);
       for (int i = 0; i < inputSchema.getFieldCount(); i++) {
         options.addExpressionColumn(
             columnName(i),
@@ -175,11 +186,7 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
                 row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
       }
 
-      // TODO[BEAM-9182]: support parameters in expression evaluation
-      // The map is empty because parameters in the query string have already been substituted.
-      Map<String, Value> params = Collections.emptyMap();
-
-      Value v = exp.execute(columns, params);
+      Value v = exp.execute(columns, nullParams);
       if (!v.isNull()) {
         Row outputRow =
             ZetaSqlBeamTranslationUtils.zetaSqlStructValueToBeamRow(
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 81bc142..df7f4e2 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -136,9 +136,12 @@ public final class ZetaSqlCalciteTranslationUtils {
     // -1 cardinality means unlimited array size.
     // TODO: is unlimited array size right for general case?
     // TODO: whether isNullable should be ArrayType's nullablity (not its element type's?)
-    return rexBuilder
-        .getTypeFactory()
-        .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1);
+    return nullable(
+        rexBuilder,
+        rexBuilder
+            .getTypeFactory()
+            .createArrayType(toRelDataType(rexBuilder, arrayType.getElementType(), isNullable), -1),
+        isNullable);
   }
 
   private static List<String> toNameList(List<StructField> fields) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index fd5651f..d0b4112 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -193,6 +193,7 @@ public class ExpressionConverter {
 
   private final RelOptCluster cluster;
   private final QueryParameters queryParams;
+  private int nullParamCount = 0;
   private final Map<String, ResolvedCreateFunctionStmt> userDefinedFunctions;
 
   public ExpressionConverter(
@@ -1217,7 +1218,17 @@ public class ExpressionConverter {
         throw new IllegalArgumentException("Found unexpected parameter " + parameter);
     }
     Preconditions.checkState(parameter.getType().equals(value.getType()));
-    return convertValueToRexNode(value.getType(), value);
+    if (value.isNull()) {
+      // In some cases NULL parameter cannot be substituted with NULL literal
+      // Therefore we create a dynamic parameter placeholder here for each NULL parameter
+      return rexBuilder()
+          .makeDynamicParam(
+              ZetaSqlCalciteTranslationUtils.toRelDataType(rexBuilder(), value.getType(), true),
+              nullParamCount++);
+    } else {
+      // Substitute non-NULL parameter with literal
+      return convertValueToRexNode(value.getType(), value);
+    }
   }
 
   private RexNode convertResolvedArgumentRef(
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java
index 3052d6f..b58c40a 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToLimitConverter.java
@@ -26,6 +26,7 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollatio
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollations;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexDynamicParam;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -64,7 +65,11 @@ class LimitOffsetScanToLimitConverter extends RelConverter<ResolvedLimitOffsetSc
                 input.getRowType().getFieldList(),
                 ImmutableMap.of());
 
-    if (RexLiteral.isNullLiteral(offset) || RexLiteral.isNullLiteral(fetch)) {
+    // offset or fetch being RexDynamicParam means it is NULL (the only param supported currently)
+    if (offset instanceof RexDynamicParam
+        || RexLiteral.isNullLiteral(offset)
+        || fetch instanceof RexDynamicParam
+        || RexLiteral.isNullLiteral(fetch)) {
       throw new UnsupportedOperationException("Limit requires non-null count and offset");
     }
 
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index d148012..9a67da6 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -252,7 +252,6 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
   }
 
   @Test
-  @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
   public void testEQ1() {
     String sql = "SELECT @p0 = @p1 AS ColA";
 
@@ -294,7 +293,6 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
   }
 
   @Test
-  @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
   public void testEQ3() {
     String sql = "SELECT @p0 = @p1 AS ColA";
 
@@ -523,7 +521,6 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
   }
 
   @Test
-  @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
   public void testNullIfCoercion() {
     String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
     ImmutableMap<String, Value> params =
@@ -733,9 +730,8 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
   }
 
   @Test
-  @Ignore("[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
   public void testLikeNullPattern() {
-    String sql = "SELECT @p0 LIKE  @p1 AS ColA";
+    String sql = "SELECT @p0 LIKE @p1 AS ColA";
     ImmutableMap<String, Value> params =
         ImmutableMap.of(
             "p0",