You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/05/08 05:15:29 UTC
[beam] branch master updated: [BEAM-9929] Support
UNNEST(array_column) in ZetaSQL.
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b130aa1 [BEAM-9929] Support UNNEST(array_column) in ZetaSQL.
new 46f42d8 Merge pull request #11636 from amaliujia/rw-support_unnest_column
b130aa1 is described below
commit b130aa15cb6d090603793c5429cb6a9c651c0b61
Author: amaliujia <am...@163.com>
AuthorDate: Thu May 7 17:18:34 2020 -0700
[BEAM-9929] Support UNNEST(array_column) in ZetaSQL.
---
.../translation/ArrayScanColumnRefToUncollect.java | 90 ++++++++++++++++++++++
...a => ArrayScanLiteralToUncollectConverter.java} | 4 +-
.../translation/QueryStatementConverter.java | 3 +-
.../sql/zetasql/ZetaSQLDialectSpecTest.java | 20 +++++
4 files changed, 114 insertions(+), 3 deletions(-)
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java
new file mode 100644
index 0000000..0a02a4a
--- /dev/null
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql.translation;
+
+import com.google.zetasql.resolvedast.ResolvedNode;
+import com.google.zetasql.resolvedast.ResolvedNodes;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.CorrelationId;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRelType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Uncollect;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.ImmutableBitSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** Converts array scan that represents a reference to array column literal to uncollect. */
+public class ArrayScanColumnRefToUncollect extends RelConverter<ResolvedNodes.ResolvedArrayScan> {
+ ArrayScanColumnRefToUncollect(ConversionContext context) {
+ super(context);
+ }
+
+ @Override
+ public boolean canConvert(ResolvedNodes.ResolvedArrayScan zetaNode) {
+ return zetaNode.getInputScan() != null
+ && zetaNode.getArrayExpr() instanceof ResolvedNodes.ResolvedColumnRef
+ && zetaNode.getJoinExpr() == null;
+ }
+
+ @Override
+ public List<ResolvedNode> getInputs(ResolvedNodes.ResolvedArrayScan zetaNode) {
+ return ImmutableList.of(zetaNode.getInputScan());
+ }
+
+ @Override
+ public RelNode convert(ResolvedNodes.ResolvedArrayScan zetaNode, List<RelNode> inputs) {
+ assert inputs.size() == 1;
+ RelNode input = inputs.get(0);
+ RexInputRef columnRef =
+ (RexInputRef)
+ getExpressionConverter()
+ .convertRexNodeFromResolvedExpr(
+ zetaNode.getArrayExpr(),
+ zetaNode.getInputScan().getColumnList(),
+ input.getRowType().getFieldList());
+
+ String fieldName =
+ String.format(
+ "%s%s",
+ zetaNode.getElementColumn().getTableName(), zetaNode.getElementColumn().getName());
+ CorrelationId correlationId = new CorrelationId(0);
+ RelNode projectNode =
+ LogicalProject.create(
+ createOneRow(getCluster()),
+ Collections.singletonList(
+ getCluster()
+ .getRexBuilder()
+ .makeFieldAccess(
+ getCluster().getRexBuilder().makeCorrel(input.getRowType(), correlationId),
+ columnRef.getIndex())),
+ ImmutableList.of(fieldName));
+
+ boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+ RelNode uncollect = Uncollect.create(projectNode.getTraitSet(), projectNode, ordinality);
+
+ return LogicalCorrelate.create(
+ input,
+ uncollect,
+ correlationId,
+ ImmutableBitSet.of(columnRef.getIndex()),
+ JoinRelType.INNER);
+ }
+}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java
similarity index 94%
rename from sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
rename to sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java
index 87d777ff..e138f60 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java
@@ -27,9 +27,9 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
/** Converts array scan that represents an array literal to uncollect. */
-class ArrayScanToUncollectConverter extends RelConverter<ResolvedArrayScan> {
+class ArrayScanLiteralToUncollectConverter extends RelConverter<ResolvedArrayScan> {
- ArrayScanToUncollectConverter(ConversionContext context) {
+ ArrayScanLiteralToUncollectConverter(ConversionContext context) {
super(context);
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
index 5513482..389eac9 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
@@ -57,7 +57,8 @@ public class QueryStatementConverter extends RelConverter<ResolvedQueryStmt> {
ImmutableMultimap.<ResolvedNodeKind, RelConverter>builder()
.put(RESOLVED_AGGREGATE_SCAN, new AggregateScanConverter(context))
.put(RESOLVED_ARRAY_SCAN, new ArrayScanToJoinConverter(context))
- .put(RESOLVED_ARRAY_SCAN, new ArrayScanToUncollectConverter(context))
+ .put(RESOLVED_ARRAY_SCAN, new ArrayScanLiteralToUncollectConverter(context))
+ .put(RESOLVED_ARRAY_SCAN, new ArrayScanColumnRefToUncollect(context))
.put(RESOLVED_FILTER_SCAN, new FilterScanConverter(context))
.put(RESOLVED_JOIN_SCAN, new JoinScanConverter(context))
.put(RESOLVED_JOIN_SCAN, new JoinScanWithRefConverter(context))
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index d02421b..5c382a4 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -3037,6 +3037,26 @@ public class ZetaSQLDialectSpecTest {
}
@Test
+ public void testUnnestArrayColumn() {
+ String sql =
+ "SELECT p FROM table_with_array_for_unnest, UNNEST(table_with_array_for_unnest.int_array_col) as p";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ Schema schema = Schema.builder().addInt64Field("int_field").build();
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(schema).addValue(14L).build(),
+ Row.withSchema(schema).addValue(18L).build(),
+ Row.withSchema(schema).addValue(22L).build(),
+ Row.withSchema(schema).addValue(24L).build());
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
@Ignore("Seeing exception in Beam, need further investigation on the cause of this failed query.")
public void testNamedUNNESTJoin() {
String sql =