You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/11/24 02:59:02 UTC

[beam] branch master updated: Support ZetaSQL IN operator

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cc3a92  Support ZetaSQL IN operator
     new 8b4b78e  Merge pull request #13381 from robinyqiu/in
6cc3a92 is described below

commit 6cc3a92a61f3526bb7119d16ff29aa6f5036a24f
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Wed Nov 18 17:01:19 2020 -0800

    Support ZetaSQL IN operator
---
 .../provider/bigquery/BeamBigQuerySqlDialect.java  | 11 +++++++
 .../zetasql/SupportedZetaSqlBuiltinFunctions.java  |  4 +--
 .../translation/SqlOperatorMappingTable.java       |  1 +
 .../sql/zetasql/ZetaSqlDialectSpecTest.java        | 36 ++++++++++++++++++++++
 4 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
index 367e92b3..7fa53f3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
@@ -101,6 +101,7 @@ public class BeamBigQuerySqlDialect extends BigQuerySqlDialect {
           .put(DOUBLE_NAN_WRAPPER, "CAST('NaN' AS FLOAT64)")
           .build();
   public static final String NUMERIC_LITERAL_WRAPPER = "numeric_literal";
+  public static final String IN_ARRAY_OPERATOR = "$in_array";
 
   public BeamBigQuerySqlDialect(Context context) {
     super(context);
@@ -185,6 +186,9 @@ public class BeamBigQuerySqlDialect extends BigQuerySqlDialect {
         } else if (EXTRACT_FUNCTIONS.containsKey(funName)) {
           unparseExtractFunctions(writer, call, leftPrec, rightPrec);
           break;
+        } else if (IN_ARRAY_OPERATOR.equals(funName)) {
+          unparseInArrayOperator(writer, call, leftPrec, rightPrec);
+          break;
         } // fall through
       default:
         super.unparseCall(writer, call, leftPrec, rightPrec);
@@ -338,6 +342,13 @@ public class BeamBigQuerySqlDialect extends BigQuerySqlDialect {
     writer.endFunCall(frame);
   }
 
+  private void unparseInArrayOperator(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+    call.operand(0).unparse(writer, leftPrec, rightPrec);
+    writer.literal("IN UNNEST(");
+    call.operand(1).unparse(writer, leftPrec, rightPrec);
+    writer.literal(")");
+  }
+
   private TimeUnit validate(TimeUnit timeUnit) {
     switch (timeUnit) {
       case MICROSECOND:
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
index d0e88ec..16471fe 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
@@ -50,8 +50,8 @@ class SupportedZetaSqlBuiltinFunctions {
           FunctionSignatureId.FN_EQUAL, // $equal
           FunctionSignatureId.FN_STRING_LIKE, // $like
           FunctionSignatureId.FN_BYTE_LIKE, // $like
-          // FunctionSignatureId.FN_IN, // $in
-          // FunctionSignatureId.FN_IN_ARRAY, // $in_array
+          FunctionSignatureId.FN_IN, // $in
+          FunctionSignatureId.FN_IN_ARRAY, // $in_array
           // FunctionSignatureId.FN_BETWEEN, // $between
           FunctionSignatureId.FN_IS_NULL, // $is_null
           FunctionSignatureId.FN_IS_TRUE, // $is_true
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java
index 8d1fcc9..58e6f81 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java
@@ -46,6 +46,7 @@ class SqlOperatorMappingTable {
           .put("$less", SqlStdOperatorTable.LESS_THAN)
           .put("$less_or_equal", SqlStdOperatorTable.LESS_THAN_OR_EQUAL)
           .put("$like", SqlOperators.LIKE)
+          .put("$in", SqlStdOperatorTable.IN)
           .put("$is_null", SqlStdOperatorTable.IS_NULL)
           .put("$is_true", SqlStdOperatorTable.IS_TRUE)
           .put("$is_false", SqlStdOperatorTable.IS_FALSE)
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index f3954c8..5336566 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -347,6 +347,42 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
   }
 
   @Test
+  public void testIn() {
+    String sql = "SELECT 'b' IN ('a', 'b', 'c')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addBooleanField("f_bool").build())
+                .addValues(true)
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testInArray() {
+    String sql = "SELECT 'b' IN UNNEST(['a', 'b', 'c'])";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addBooleanField("f_bool").build())
+                .addValues(true)
+                .build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testIsNotNull1() {
     String sql = "SELECT @p0 IS NOT NULL AS ColA";
     ImmutableMap<String, Value> params =