You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/11/24 02:59:02 UTC
[beam] branch master updated: Support ZetaSQL IN operator
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6cc3a92 Support ZetaSQL IN operator
new 8b4b78e Merge pull request #13381 from robinyqiu/in
6cc3a92 is described below
commit 6cc3a92a61f3526bb7119d16ff29aa6f5036a24f
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Wed Nov 18 17:01:19 2020 -0800
Support ZetaSQL IN operator
---
.../provider/bigquery/BeamBigQuerySqlDialect.java | 11 +++++++
.../zetasql/SupportedZetaSqlBuiltinFunctions.java | 4 +--
.../translation/SqlOperatorMappingTable.java | 1 +
.../sql/zetasql/ZetaSqlDialectSpecTest.java | 36 ++++++++++++++++++++++
4 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
index 367e92b3..7fa53f3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
@@ -101,6 +101,7 @@ public class BeamBigQuerySqlDialect extends BigQuerySqlDialect {
.put(DOUBLE_NAN_WRAPPER, "CAST('NaN' AS FLOAT64)")
.build();
public static final String NUMERIC_LITERAL_WRAPPER = "numeric_literal";
+ public static final String IN_ARRAY_OPERATOR = "$in_array";
public BeamBigQuerySqlDialect(Context context) {
super(context);
@@ -185,6 +186,9 @@ public class BeamBigQuerySqlDialect extends BigQuerySqlDialect {
} else if (EXTRACT_FUNCTIONS.containsKey(funName)) {
unparseExtractFunctions(writer, call, leftPrec, rightPrec);
break;
+ } else if (IN_ARRAY_OPERATOR.equals(funName)) {
+ unparseInArrayOperator(writer, call, leftPrec, rightPrec);
+ break;
} // fall through
default:
super.unparseCall(writer, call, leftPrec, rightPrec);
@@ -338,6 +342,13 @@ public class BeamBigQuerySqlDialect extends BigQuerySqlDialect {
writer.endFunCall(frame);
}
+ private void unparseInArrayOperator(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+ call.operand(0).unparse(writer, leftPrec, rightPrec);
+ writer.literal("IN UNNEST(");
+ call.operand(1).unparse(writer, leftPrec, rightPrec);
+ writer.literal(")");
+ }
+
private TimeUnit validate(TimeUnit timeUnit) {
switch (timeUnit) {
case MICROSECOND:
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
index d0e88ec..16471fe 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
@@ -50,8 +50,8 @@ class SupportedZetaSqlBuiltinFunctions {
FunctionSignatureId.FN_EQUAL, // $equal
FunctionSignatureId.FN_STRING_LIKE, // $like
FunctionSignatureId.FN_BYTE_LIKE, // $like
- // FunctionSignatureId.FN_IN, // $in
- // FunctionSignatureId.FN_IN_ARRAY, // $in_array
+ FunctionSignatureId.FN_IN, // $in
+ FunctionSignatureId.FN_IN_ARRAY, // $in_array
// FunctionSignatureId.FN_BETWEEN, // $between
FunctionSignatureId.FN_IS_NULL, // $is_null
FunctionSignatureId.FN_IS_TRUE, // $is_true
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java
index 8d1fcc9..58e6f81 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperatorMappingTable.java
@@ -46,6 +46,7 @@ class SqlOperatorMappingTable {
.put("$less", SqlStdOperatorTable.LESS_THAN)
.put("$less_or_equal", SqlStdOperatorTable.LESS_THAN_OR_EQUAL)
.put("$like", SqlOperators.LIKE)
+ .put("$in", SqlStdOperatorTable.IN)
.put("$is_null", SqlStdOperatorTable.IS_NULL)
.put("$is_true", SqlStdOperatorTable.IS_TRUE)
.put("$is_false", SqlStdOperatorTable.IS_FALSE)
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index f3954c8..5336566 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -347,6 +347,42 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
}
@Test
+ public void testIn() {
+ String sql = "SELECT 'b' IN ('a', 'b', 'c')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addBooleanField("f_bool").build())
+ .addValues(true)
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testInArray() {
+ String sql = "SELECT 'b' IN UNNEST(['a', 'b', 'c'])";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addBooleanField("f_bool").build())
+ .addValues(true)
+ .build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testIsNotNull1() {
String sql = "SELECT @p0 IS NOT NULL AS ColA";
ImmutableMap<String, Value> params =