You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/05/08 05:15:29 UTC

[beam] branch master updated: [BEAM-9929] Support UNNEST(array_column) in ZetaSQL.

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b130aa1  [BEAM-9929] Support UNNEST(array_column) in ZetaSQL.
     new 46f42d8  Merge pull request #11636 from amaliujia/rw-support_unnest_column
b130aa1 is described below

commit b130aa15cb6d090603793c5429cb6a9c651c0b61
Author: amaliujia <am...@163.com>
AuthorDate: Thu May 7 17:18:34 2020 -0700

    [BEAM-9929] Support UNNEST(array_column) in ZetaSQL.
---
 .../translation/ArrayScanColumnRefToUncollect.java | 90 ++++++++++++++++++++++
 ...a => ArrayScanLiteralToUncollectConverter.java} |  4 +-
 .../translation/QueryStatementConverter.java       |  3 +-
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        | 20 +++++
 4 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java
new file mode 100644
index 0000000..0a02a4a
--- /dev/null
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanColumnRefToUncollect.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql.translation;
+
+import com.google.zetasql.resolvedast.ResolvedNode;
+import com.google.zetasql.resolvedast.ResolvedNodes;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.CorrelationId;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRelType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Uncollect;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.ImmutableBitSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** Converts array scan that represents a reference to array column literal to uncollect. */
+public class ArrayScanColumnRefToUncollect extends RelConverter<ResolvedNodes.ResolvedArrayScan> {
+  ArrayScanColumnRefToUncollect(ConversionContext context) {
+    super(context);
+  }
+
+  @Override
+  public boolean canConvert(ResolvedNodes.ResolvedArrayScan zetaNode) {
+    return zetaNode.getInputScan() != null
+        && zetaNode.getArrayExpr() instanceof ResolvedNodes.ResolvedColumnRef
+        && zetaNode.getJoinExpr() == null;
+  }
+
+  @Override
+  public List<ResolvedNode> getInputs(ResolvedNodes.ResolvedArrayScan zetaNode) {
+    return ImmutableList.of(zetaNode.getInputScan());
+  }
+
+  @Override
+  public RelNode convert(ResolvedNodes.ResolvedArrayScan zetaNode, List<RelNode> inputs) {
+    assert inputs.size() == 1;
+    RelNode input = inputs.get(0);
+    RexInputRef columnRef =
+        (RexInputRef)
+            getExpressionConverter()
+                .convertRexNodeFromResolvedExpr(
+                    zetaNode.getArrayExpr(),
+                    zetaNode.getInputScan().getColumnList(),
+                    input.getRowType().getFieldList());
+
+    String fieldName =
+        String.format(
+            "%s%s",
+            zetaNode.getElementColumn().getTableName(), zetaNode.getElementColumn().getName());
+    CorrelationId correlationId = new CorrelationId(0);
+    RelNode projectNode =
+        LogicalProject.create(
+            createOneRow(getCluster()),
+            Collections.singletonList(
+                getCluster()
+                    .getRexBuilder()
+                    .makeFieldAccess(
+                        getCluster().getRexBuilder().makeCorrel(input.getRowType(), correlationId),
+                        columnRef.getIndex())),
+            ImmutableList.of(fieldName));
+
+    boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+    RelNode uncollect = Uncollect.create(projectNode.getTraitSet(), projectNode, ordinality);
+
+    return LogicalCorrelate.create(
+        input,
+        uncollect,
+        correlationId,
+        ImmutableBitSet.of(columnRef.getIndex()),
+        JoinRelType.INNER);
+  }
+}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java
similarity index 94%
rename from sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
rename to sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java
index 87d777ff..e138f60 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanLiteralToUncollectConverter.java
@@ -27,9 +27,9 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 
 /** Converts array scan that represents an array literal to uncollect. */
-class ArrayScanToUncollectConverter extends RelConverter<ResolvedArrayScan> {
+class ArrayScanLiteralToUncollectConverter extends RelConverter<ResolvedArrayScan> {
 
-  ArrayScanToUncollectConverter(ConversionContext context) {
+  ArrayScanLiteralToUncollectConverter(ConversionContext context) {
     super(context);
   }
 
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
index 5513482..389eac9 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
@@ -57,7 +57,8 @@ public class QueryStatementConverter extends RelConverter<ResolvedQueryStmt> {
         ImmutableMultimap.<ResolvedNodeKind, RelConverter>builder()
             .put(RESOLVED_AGGREGATE_SCAN, new AggregateScanConverter(context))
             .put(RESOLVED_ARRAY_SCAN, new ArrayScanToJoinConverter(context))
-            .put(RESOLVED_ARRAY_SCAN, new ArrayScanToUncollectConverter(context))
+            .put(RESOLVED_ARRAY_SCAN, new ArrayScanLiteralToUncollectConverter(context))
+            .put(RESOLVED_ARRAY_SCAN, new ArrayScanColumnRefToUncollect(context))
             .put(RESOLVED_FILTER_SCAN, new FilterScanConverter(context))
             .put(RESOLVED_JOIN_SCAN, new JoinScanConverter(context))
             .put(RESOLVED_JOIN_SCAN, new JoinScanWithRefConverter(context))
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index d02421b..5c382a4 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -3037,6 +3037,26 @@ public class ZetaSQLDialectSpecTest {
   }
 
   @Test
+  public void testUnnestArrayColumn() {
+    String sql =
+        "SELECT p FROM table_with_array_for_unnest, UNNEST(table_with_array_for_unnest.int_array_col) as p";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema schema = Schema.builder().addInt64Field("int_field").build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema).addValue(14L).build(),
+            Row.withSchema(schema).addValue(18L).build(),
+            Row.withSchema(schema).addValue(22L).build(),
+            Row.withSchema(schema).addValue(24L).build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   @Ignore("Seeing exception in Beam, need further investigation on the cause of this failed query.")
   public void testNamedUNNESTJoin() {
     String sql =