You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/21 23:32:27 UTC
[beam] branch master updated: Fix bug in JoinScanWithRefConverter
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b57aab0 Fix bug in JoinScanWithRefConverter
new 0ade7bd Merge pull request #12648 from robinyqiu/with-join
b57aab0 is described below
commit b57aab02add161bce86d0f776adcb28e25b98719
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Thu Aug 20 14:55:29 2020 -0700
Fix bug in JoinScanWithRefConverter
---
.../zetasql/translation/ExpressionConverter.java | 97 --------------------
.../sql/zetasql/translation/JoinScanConverter.java | 4 +-
.../translation/JoinScanWithRefConverter.java | 100 ---------------------
.../translation/QueryStatementConverter.java | 1 -
.../sql/zetasql/ZetaSqlDialectSpecTest.java | 34 +++++++
5 files changed, 35 insertions(+), 201 deletions(-)
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index d8020cb..bf8d531 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -59,7 +59,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.annotations.Internal;
@@ -360,80 +359,6 @@ public class ExpressionConverter {
return ret;
}
- /** Extract the RexNode from expression with ref scan. */
- public RexNode convertRexNodeFromResolvedExprWithRefScan(
- ResolvedExpr expr,
- List<ResolvedColumn> refScanLeftColumnList,
- List<RelDataTypeField> leftFieldList,
- List<ResolvedColumn> originalLeftColumnList,
- List<ResolvedColumn> refScanRightColumnList,
- List<RelDataTypeField> rightFieldList,
- List<ResolvedColumn> originalRightColumnList) {
- switch (expr.nodeKind()) {
- case RESOLVED_LITERAL:
- return convertResolvedLiteral((ResolvedLiteral) expr);
- case RESOLVED_COLUMN_REF:
- ResolvedColumnRef columnRef = (ResolvedColumnRef) expr;
- // first look for column ref on the left side
- Optional<RexNode> colRexNode =
- convertRexNodeFromResolvedColumnRefWithRefScan(
- columnRef, refScanLeftColumnList, originalLeftColumnList, leftFieldList);
-
- if (colRexNode.isPresent()) {
- return colRexNode.get();
- }
-
- // if not found there look on the right
- colRexNode =
- convertRexNodeFromResolvedColumnRefWithRefScan(
- columnRef, refScanRightColumnList, originalRightColumnList, rightFieldList);
- if (colRexNode.isPresent()) {
- return colRexNode.get();
- }
-
- throw new IllegalArgumentException(
- String.format(
- "Could not find column reference %s in %s or %s",
- columnRef, refScanLeftColumnList, refScanRightColumnList));
- case RESOLVED_FUNCTION_CALL:
- // JOIN only support equal join.
- ResolvedFunctionCall resolvedFunctionCall = (ResolvedFunctionCall) expr;
- List<RexNode> operands = new ArrayList<>();
-
- for (ResolvedExpr resolvedExpr : resolvedFunctionCall.getArgumentList()) {
- operands.add(
- convertRexNodeFromResolvedExprWithRefScan(
- resolvedExpr,
- refScanLeftColumnList,
- leftFieldList,
- originalLeftColumnList,
- refScanRightColumnList,
- rightFieldList,
- originalRightColumnList));
- }
-
- SqlOperator op =
- SqlOperatorMappingTable.ZETASQL_FUNCTION_TO_CALCITE_SQL_OPERATOR.get(
- resolvedFunctionCall.getFunction().getName());
- return rexBuilder().makeCall(op, operands);
- case RESOLVED_CAST:
- ResolvedCast resolvedCast = (ResolvedCast) expr;
- return convertResolvedCast(
- resolvedCast,
- convertRexNodeFromResolvedExprWithRefScan(
- resolvedCast.getExpr(),
- refScanLeftColumnList,
- leftFieldList,
- originalLeftColumnList,
- refScanRightColumnList,
- rightFieldList,
- originalRightColumnList));
- default:
- throw new UnsupportedOperationException(
- "Does not support expr node kind " + expr.nodeKind());
- }
- }
-
private RexNode convertRexNodeFromComputedColumnWithFieldList(
ResolvedComputedColumn column,
List<ResolvedColumn> columnList,
@@ -897,28 +822,6 @@ public class ExpressionConverter {
|| (fromType.equals(TYPE_TIMESTAMP) && toType.equals(TYPE_STRING));
}
- private Optional<RexNode> convertRexNodeFromResolvedColumnRefWithRefScan(
- ResolvedColumnRef columnRef,
- List<ResolvedColumn> refScanColumnList,
- List<ResolvedColumn> originalColumnList,
- List<RelDataTypeField> fieldList) {
-
- for (int i = 0; i < refScanColumnList.size(); i++) {
- if (refScanColumnList.get(i).getId() == columnRef.getColumn().getId()) {
- boolean nullable = fieldList.get(i).getType().isNullable();
- int off = (int) originalColumnList.get(i).getId() - 1;
- return Optional.of(
- rexBuilder()
- .makeInputRef(
- ZetaSqlCalciteTranslationUtils.toCalciteType(
- columnRef.getType(), nullable, rexBuilder()),
- off));
- }
- }
-
- return Optional.empty();
- }
-
private RexNode convertResolvedParameter(ResolvedParameter parameter) {
Value value;
switch (queryParams.getKind()) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java
index 0185589..0f94206 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java
@@ -21,7 +21,6 @@ import com.google.zetasql.resolvedast.ResolvedColumn;
import com.google.zetasql.resolvedast.ResolvedJoinScanEnums.JoinType;
import com.google.zetasql.resolvedast.ResolvedNode;
import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedJoinScan;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedWithRefScan;
import java.util.List;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRelType;
@@ -52,8 +51,7 @@ class JoinScanConverter extends RelConverter<ResolvedJoinScan> {
@Override
public boolean canConvert(ResolvedJoinScan zetaNode) {
- return !(zetaNode.getLeftScan() instanceof ResolvedWithRefScan)
- && !(zetaNode.getRightScan() instanceof ResolvedWithRefScan);
+ return true;
}
@Override
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanWithRefConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanWithRefConverter.java
deleted file mode 100644
index 461d64f..0000000
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanWithRefConverter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.zetasql.translation;
-
-import static org.apache.beam.sdk.extensions.sql.zetasql.translation.JoinScanConverter.convertResolvedJoinType;
-
-import com.google.zetasql.resolvedast.ResolvedColumn;
-import com.google.zetasql.resolvedast.ResolvedNode;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedJoinScan;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedScan;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedWithRefScan;
-import java.util.List;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-
-/** Converts joins where at least one of the inputs is a WITH subquery. */
-class JoinScanWithRefConverter extends RelConverter<ResolvedJoinScan> {
-
- JoinScanWithRefConverter(ConversionContext context) {
- super(context);
- }
-
- /** This is a special logic due to re-indexed column reference in WithScan. */
- @Override
- public boolean canConvert(ResolvedJoinScan zetaNode) {
- return zetaNode.getLeftScan() instanceof ResolvedWithRefScan
- || zetaNode.getRightScan() instanceof ResolvedWithRefScan;
- }
-
- @Override
- public List<ResolvedNode> getInputs(ResolvedJoinScan zetaNode) {
- return ImmutableList.of(zetaNode.getLeftScan(), zetaNode.getRightScan());
- }
-
- @Override
- public RelNode convert(ResolvedJoinScan zetaNode, List<RelNode> inputs) {
- RelNode calciteLeftInput = inputs.get(0);
- RelNode calciteRightInput = inputs.get(1);
-
- List<ResolvedColumn> zetaLeftColumnList = getColumnsForScan(zetaNode.getLeftScan());
- List<ResolvedColumn> zetaRightColumnList = getColumnsForScan(zetaNode.getRightScan());
-
- final RexNode condition;
- if (zetaNode.getJoinExpr() == null) {
- condition = getExpressionConverter().trueLiteral();
- } else {
- condition =
- getExpressionConverter()
- .convertRexNodeFromResolvedExprWithRefScan(
- zetaNode.getJoinExpr(),
- zetaNode.getLeftScan().getColumnList(),
- calciteLeftInput.getRowType().getFieldList(),
- zetaLeftColumnList,
- zetaNode.getRightScan().getColumnList(),
- calciteRightInput.getRowType().getFieldList(),
- zetaRightColumnList);
- }
-
- return LogicalJoin.create(
- calciteLeftInput,
- calciteRightInput,
- condition,
- ImmutableSet.of(),
- convertResolvedJoinType(zetaNode.getJoinType()));
- }
-
- /**
- * WithRefScan doesn't have columns in it, it only references a WITH query by name, we have to
- * look up the actual query node in the context by that name.
- *
- * <p>The context has a map of WITH queries populated when the inputs to this JOIN are parsed.
- */
- private List<ResolvedColumn> getColumnsForScan(ResolvedScan resolvedScan) {
- return resolvedScan instanceof ResolvedWithRefScan
- ? getTrait()
- .withEntries
- .get(((ResolvedWithRefScan) resolvedScan).getWithQueryName())
- .getWithSubquery()
- .getColumnList()
- : resolvedScan.getColumnList();
- }
-}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
index 7a5964d..3913b39 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
@@ -62,7 +62,6 @@ public class QueryStatementConverter extends RelConverter<ResolvedQueryStmt> {
.put(RESOLVED_ARRAY_SCAN, new ArrayScanColumnRefToUncollect(context))
.put(RESOLVED_FILTER_SCAN, new FilterScanConverter(context))
.put(RESOLVED_JOIN_SCAN, new JoinScanConverter(context))
- .put(RESOLVED_JOIN_SCAN, new JoinScanWithRefConverter(context))
.put(RESOLVED_LIMIT_OFFSET_SCAN, new LimitOffsetScanToLimitConverter(context))
.put(RESOLVED_LIMIT_OFFSET_SCAN, new LimitOffsetScanToOrderByLimitConverter(context))
.put(RESOLVED_ORDER_BY_SCAN, new OrderByScanUnsupportedConverter(context))
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index 9a67da6..a802e57 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -2901,6 +2901,40 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
}
@Test
+ public void testWithQuerySeven() {
+ String sql =
+ "WITH t1 AS (select 1 AS k), t2 AS (select 1 AS k), t3 AS (select 1 AS k) "
+ + "SELECT COUNT(*) "
+ + "FROM t1 JOIN t3 USING (k)";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addInt64Field("count").build()).addValues(1L).build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testWithQueryEight() {
+ String sql =
+ "WITH T AS (SELECT k, 'hello' AS s FROM UNNEST([1, 2, 3]) k) "
+ + "SELECT COUNT(*) "
+ + "FROM T t1 JOIN T t2 USING (k)";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addInt64Field("count").build()).addValues(3L).build());
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testUNNESTLiteral() {
String sql = "SELECT * FROM UNNEST(ARRAY<STRING>['foo', 'bar']);";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);