You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/21 23:32:27 UTC

[beam] branch master updated: Fix bug in JoinScanWithRefConverter

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b57aab0  Fix bug in JoinScanWithRefConverter
     new 0ade7bd  Merge pull request #12648 from robinyqiu/with-join
b57aab0 is described below

commit b57aab02add161bce86d0f776adcb28e25b98719
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Thu Aug 20 14:55:29 2020 -0700

    Fix bug in JoinScanWithRefConverter
---
 .../zetasql/translation/ExpressionConverter.java   |  97 --------------------
 .../sql/zetasql/translation/JoinScanConverter.java |   4 +-
 .../translation/JoinScanWithRefConverter.java      | 100 ---------------------
 .../translation/QueryStatementConverter.java       |   1 -
 .../sql/zetasql/ZetaSqlDialectSpecTest.java        |  34 +++++++
 5 files changed, 35 insertions(+), 201 deletions(-)

diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index d8020cb..bf8d531 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -59,7 +59,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.beam.sdk.annotations.Internal;
@@ -360,80 +359,6 @@ public class ExpressionConverter {
     return ret;
   }
 
-  /** Extract the RexNode from expression with ref scan. */
-  public RexNode convertRexNodeFromResolvedExprWithRefScan(
-      ResolvedExpr expr,
-      List<ResolvedColumn> refScanLeftColumnList,
-      List<RelDataTypeField> leftFieldList,
-      List<ResolvedColumn> originalLeftColumnList,
-      List<ResolvedColumn> refScanRightColumnList,
-      List<RelDataTypeField> rightFieldList,
-      List<ResolvedColumn> originalRightColumnList) {
-    switch (expr.nodeKind()) {
-      case RESOLVED_LITERAL:
-        return convertResolvedLiteral((ResolvedLiteral) expr);
-      case RESOLVED_COLUMN_REF:
-        ResolvedColumnRef columnRef = (ResolvedColumnRef) expr;
-        // first look for column ref on the left side
-        Optional<RexNode> colRexNode =
-            convertRexNodeFromResolvedColumnRefWithRefScan(
-                columnRef, refScanLeftColumnList, originalLeftColumnList, leftFieldList);
-
-        if (colRexNode.isPresent()) {
-          return colRexNode.get();
-        }
-
-        // if not found there look on the right
-        colRexNode =
-            convertRexNodeFromResolvedColumnRefWithRefScan(
-                columnRef, refScanRightColumnList, originalRightColumnList, rightFieldList);
-        if (colRexNode.isPresent()) {
-          return colRexNode.get();
-        }
-
-        throw new IllegalArgumentException(
-            String.format(
-                "Could not find column reference %s in %s or %s",
-                columnRef, refScanLeftColumnList, refScanRightColumnList));
-      case RESOLVED_FUNCTION_CALL:
-        // JOIN only support equal join.
-        ResolvedFunctionCall resolvedFunctionCall = (ResolvedFunctionCall) expr;
-        List<RexNode> operands = new ArrayList<>();
-
-        for (ResolvedExpr resolvedExpr : resolvedFunctionCall.getArgumentList()) {
-          operands.add(
-              convertRexNodeFromResolvedExprWithRefScan(
-                  resolvedExpr,
-                  refScanLeftColumnList,
-                  leftFieldList,
-                  originalLeftColumnList,
-                  refScanRightColumnList,
-                  rightFieldList,
-                  originalRightColumnList));
-        }
-
-        SqlOperator op =
-            SqlOperatorMappingTable.ZETASQL_FUNCTION_TO_CALCITE_SQL_OPERATOR.get(
-                resolvedFunctionCall.getFunction().getName());
-        return rexBuilder().makeCall(op, operands);
-      case RESOLVED_CAST:
-        ResolvedCast resolvedCast = (ResolvedCast) expr;
-        return convertResolvedCast(
-            resolvedCast,
-            convertRexNodeFromResolvedExprWithRefScan(
-                resolvedCast.getExpr(),
-                refScanLeftColumnList,
-                leftFieldList,
-                originalLeftColumnList,
-                refScanRightColumnList,
-                rightFieldList,
-                originalRightColumnList));
-      default:
-        throw new UnsupportedOperationException(
-            "Does not support expr node kind " + expr.nodeKind());
-    }
-  }
-
   private RexNode convertRexNodeFromComputedColumnWithFieldList(
       ResolvedComputedColumn column,
       List<ResolvedColumn> columnList,
@@ -897,28 +822,6 @@ public class ExpressionConverter {
         || (fromType.equals(TYPE_TIMESTAMP) && toType.equals(TYPE_STRING));
   }
 
-  private Optional<RexNode> convertRexNodeFromResolvedColumnRefWithRefScan(
-      ResolvedColumnRef columnRef,
-      List<ResolvedColumn> refScanColumnList,
-      List<ResolvedColumn> originalColumnList,
-      List<RelDataTypeField> fieldList) {
-
-    for (int i = 0; i < refScanColumnList.size(); i++) {
-      if (refScanColumnList.get(i).getId() == columnRef.getColumn().getId()) {
-        boolean nullable = fieldList.get(i).getType().isNullable();
-        int off = (int) originalColumnList.get(i).getId() - 1;
-        return Optional.of(
-            rexBuilder()
-                .makeInputRef(
-                    ZetaSqlCalciteTranslationUtils.toCalciteType(
-                        columnRef.getType(), nullable, rexBuilder()),
-                    off));
-      }
-    }
-
-    return Optional.empty();
-  }
-
   private RexNode convertResolvedParameter(ResolvedParameter parameter) {
     Value value;
     switch (queryParams.getKind()) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java
index 0185589..0f94206 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanConverter.java
@@ -21,7 +21,6 @@ import com.google.zetasql.resolvedast.ResolvedColumn;
 import com.google.zetasql.resolvedast.ResolvedJoinScanEnums.JoinType;
 import com.google.zetasql.resolvedast.ResolvedNode;
 import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedJoinScan;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedWithRefScan;
 import java.util.List;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRelType;
@@ -52,8 +51,7 @@ class JoinScanConverter extends RelConverter<ResolvedJoinScan> {
 
   @Override
   public boolean canConvert(ResolvedJoinScan zetaNode) {
-    return !(zetaNode.getLeftScan() instanceof ResolvedWithRefScan)
-        && !(zetaNode.getRightScan() instanceof ResolvedWithRefScan);
+    return true;
   }
 
   @Override
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanWithRefConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanWithRefConverter.java
deleted file mode 100644
index 461d64f..0000000
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JoinScanWithRefConverter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.zetasql.translation;
-
-import static org.apache.beam.sdk.extensions.sql.zetasql.translation.JoinScanConverter.convertResolvedJoinType;
-
-import com.google.zetasql.resolvedast.ResolvedColumn;
-import com.google.zetasql.resolvedast.ResolvedNode;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedJoinScan;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedScan;
-import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedWithRefScan;
-import java.util.List;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-
-/** Converts joins where at least one of the inputs is a WITH subquery. */
-class JoinScanWithRefConverter extends RelConverter<ResolvedJoinScan> {
-
-  JoinScanWithRefConverter(ConversionContext context) {
-    super(context);
-  }
-
-  /** This is a special logic due to re-indexed column reference in WithScan. */
-  @Override
-  public boolean canConvert(ResolvedJoinScan zetaNode) {
-    return zetaNode.getLeftScan() instanceof ResolvedWithRefScan
-        || zetaNode.getRightScan() instanceof ResolvedWithRefScan;
-  }
-
-  @Override
-  public List<ResolvedNode> getInputs(ResolvedJoinScan zetaNode) {
-    return ImmutableList.of(zetaNode.getLeftScan(), zetaNode.getRightScan());
-  }
-
-  @Override
-  public RelNode convert(ResolvedJoinScan zetaNode, List<RelNode> inputs) {
-    RelNode calciteLeftInput = inputs.get(0);
-    RelNode calciteRightInput = inputs.get(1);
-
-    List<ResolvedColumn> zetaLeftColumnList = getColumnsForScan(zetaNode.getLeftScan());
-    List<ResolvedColumn> zetaRightColumnList = getColumnsForScan(zetaNode.getRightScan());
-
-    final RexNode condition;
-    if (zetaNode.getJoinExpr() == null) {
-      condition = getExpressionConverter().trueLiteral();
-    } else {
-      condition =
-          getExpressionConverter()
-              .convertRexNodeFromResolvedExprWithRefScan(
-                  zetaNode.getJoinExpr(),
-                  zetaNode.getLeftScan().getColumnList(),
-                  calciteLeftInput.getRowType().getFieldList(),
-                  zetaLeftColumnList,
-                  zetaNode.getRightScan().getColumnList(),
-                  calciteRightInput.getRowType().getFieldList(),
-                  zetaRightColumnList);
-    }
-
-    return LogicalJoin.create(
-        calciteLeftInput,
-        calciteRightInput,
-        condition,
-        ImmutableSet.of(),
-        convertResolvedJoinType(zetaNode.getJoinType()));
-  }
-
-  /**
-   * WithRefScan doesn't have columns in it, it only references a WITH query by name, we have to
-   * look up the actual query node in the context by that name.
-   *
-   * <p>The context has a map of WITH queries populated when the inputs to this JOIN are parsed.
-   */
-  private List<ResolvedColumn> getColumnsForScan(ResolvedScan resolvedScan) {
-    return resolvedScan instanceof ResolvedWithRefScan
-        ? getTrait()
-            .withEntries
-            .get(((ResolvedWithRefScan) resolvedScan).getWithQueryName())
-            .getWithSubquery()
-            .getColumnList()
-        : resolvedScan.getColumnList();
-  }
-}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
index 7a5964d..3913b39 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/QueryStatementConverter.java
@@ -62,7 +62,6 @@ public class QueryStatementConverter extends RelConverter<ResolvedQueryStmt> {
             .put(RESOLVED_ARRAY_SCAN, new ArrayScanColumnRefToUncollect(context))
             .put(RESOLVED_FILTER_SCAN, new FilterScanConverter(context))
             .put(RESOLVED_JOIN_SCAN, new JoinScanConverter(context))
-            .put(RESOLVED_JOIN_SCAN, new JoinScanWithRefConverter(context))
             .put(RESOLVED_LIMIT_OFFSET_SCAN, new LimitOffsetScanToLimitConverter(context))
             .put(RESOLVED_LIMIT_OFFSET_SCAN, new LimitOffsetScanToOrderByLimitConverter(context))
             .put(RESOLVED_ORDER_BY_SCAN, new OrderByScanUnsupportedConverter(context))
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index 9a67da6..a802e57 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -2901,6 +2901,40 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
   }
 
   @Test
+  public void testWithQuerySeven() {
+    String sql =
+        "WITH t1 AS (select 1 AS k), t2 AS (select 1 AS k), t3 AS (select 1 AS k) "
+            + "SELECT COUNT(*) "
+            + "FROM t1 JOIN t3 USING (k)";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addInt64Field("count").build()).addValues(1L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testWithQueryEight() {
+    String sql =
+        "WITH T AS (SELECT k, 'hello' AS s FROM UNNEST([1, 2, 3]) k) "
+            + "SELECT COUNT(*) "
+            + "FROM T t1 JOIN T t2 USING (k)";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addInt64Field("count").build()).addValues(3L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testUNNESTLiteral() {
     String sql = "SELECT * FROM UNNEST(ARRAY<STRING>['foo', 'bar']);";
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);