You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 00:32:52 UTC
[flink] 01/02: [FLINK-13076] [table-planner-blink] Bump Calcite
dependency to 1.20.0 in blink planner
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 42b04b01976206b6fc7ebcb58871b627463171f0
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Jul 3 16:53:19 2019 +0800
[FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner
---
flink-table/flink-table-planner-blink/pom.xml | 17 +-
.../java/org/apache/calcite/rel/core/Join.java | 341 ---------------------
.../org/apache/calcite/rel/core/JoinRelType.java | 184 -----------
.../apache/calcite/sql2rel/RelDecorrelator.java | 79 +++--
.../catalog/FunctionCatalogOperatorTable.java | 4 +-
.../table/functions/sql/FlinkSqlOperatorTable.java | 30 +-
.../plan/rules/logical/FlinkFilterJoinRule.java | 14 +-
.../plan/rules/logical/SubQueryDecorrelator.java | 5 -
.../table/calcite/FlinkLogicalRelFactories.scala | 8 +-
.../table/codegen/CorrelateCodeGenerator.scala | 12 +-
.../plan/metadata/FlinkRelMdColumnUniqueness.scala | 6 +-
.../nodes/calcite/LogicalWindowAggregate.scala | 3 +-
.../table/plan/nodes/calcite/WindowAggregate.scala | 1 -
.../plan/nodes/common/CommonPhysicalJoin.scala | 5 +-
.../plan/nodes/logical/FlinkLogicalAggregate.scala | 11 +-
.../plan/nodes/logical/FlinkLogicalCorrelate.scala | 9 +-
.../logical/FlinkLogicalTableFunctionScan.scala | 10 +-
.../logical/FlinkLogicalWindowAggregate.scala | 3 +-
.../nodes/physical/batch/BatchExecCorrelate.scala | 8 +-
.../physical/stream/StreamExecCorrelate.scala | 6 +-
.../plan/optimize/program/FlinkBatchProgram.scala | 1 -
.../plan/optimize/program/FlinkStreamProgram.scala | 1 -
...gicalCorrelateToJoinFromTemporalTableRule.scala | 49 ++-
.../physical/stream/StreamExecCorrelateRule.scala | 13 +-
.../flink/table/plan/util/FlinkRelOptUtil.scala | 194 +-----------
.../table/plan/batch/sql/SetOperatorsTest.xml | 4 +-
.../table/plan/batch/sql/SubplanReuseTest.xml | 4 +-
.../table/plan/batch/sql/join/LookupJoinTest.xml | 84 ++---
.../ReplaceIntersectWithSemiJoinRuleTest.xml | 6 +-
.../logical/ReplaceMinusWithAntiJoinRuleTest.xml | 6 +-
.../SubqueryCorrelateVariablesValidationTest.xml | 102 ++++++
.../table/plan/stream/sql/SetOperatorsTest.xml | 4 +-
.../table/plan/stream/sql/SubplanReuseTest.xml | 4 +-
.../table/plan/stream/sql/join/LookupJoinTest.xml | 56 ++--
.../validation/ScalarFunctionsValidationTest.scala | 6 +-
.../table/plan/batch/sql/SubplanReuseTest.scala | 3 +-
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 4 +-
.../logical/subquery/SubQueryAntiJoinTest.scala | 14 +-
.../logical/subquery/SubQuerySemiJoinTest.scala | 12 +-
.../SubqueryCorrelateVariablesValidationTest.scala | 7 +-
.../table/plan/stream/sql/SubplanReuseTest.scala | 3 +-
.../plan/stream/sql/join/LookupJoinTest.scala | 4 +-
.../batch/sql/agg/PruneAggregateCallITCase.scala | 2 +-
flink-table/flink-table-runtime-blink/pom.xml | 6 +-
44 files changed, 390 insertions(+), 955 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index 461d63e..b0ba76f 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -150,17 +150,13 @@ under the License.
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
- <version>1.19.0</version>
+ <version>1.20.0</version>
<exclusions>
<!--
+ "mvn dependency:tree" as of Calcite 1.20:
- Dependencies that are not needed for how we use Calcite right now.
-
- "mvn dependency:tree" as of Calcite 1.19:
-
- [INFO] +- org.apache.calcite:calcite-core:jar:1.19.0:compile
- [INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
- [INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.19.0:compile
+ [INFO] +- org.apache.calcite:calcite-core:jar:1.20.0:compile
+ [INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.20.0:compile
[INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.9.6:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.6:compile
@@ -168,6 +164,7 @@ under the License.
[INFO] | +- com.google.guava:guava:jar:19.0:compile
[INFO] | \- com.jayway.jsonpath:json-path:jar:2.4.0:compile
+ Dependencies that are not needed for how we use Calcite right now.
-->
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
@@ -205,6 +202,10 @@ under the License.
<groupId>net.hydromatic</groupId>
<artifactId>aggdesigner-algorithm</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java
deleted file mode 100644
index 657efbf..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.rel.RelNode.Context;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.metadata.RelMdUtil;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexChecker;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Litmus;
-import org.apache.calcite.util.Util;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * This class is copied from https://github.com/apache/calcite/pull/1157 to supports SEMI/ANTI join.
- * NOTES: This file should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- */
-
-/**
- * Relational expression that combines two relational expressions according to
- * some condition.
- *
- * <p>Each output row has columns from the left and right inputs.
- * The set of output rows is a subset of the cartesian product of the two
- * inputs; precisely which subset depends on the join condition.
- */
-public abstract class Join extends BiRel {
- //~ Instance fields --------------------------------------------------------
-
- protected final RexNode condition;
- protected final ImmutableSet<CorrelationId> variablesSet;
-
- /**
- * Values must be of enumeration {@link JoinRelType}, except that
- * {@link JoinRelType#RIGHT} is disallowed.
- */
- protected final JoinRelType joinType;
-
- protected final JoinInfo joinInfo;
-
- //~ Constructors -----------------------------------------------------------
-
- // Next time we need to change the constructor of Join, let's change the
- // "Set<String> variablesStopped" parameter to
- // "Set<CorrelationId> variablesSet". At that point we would deprecate
- // RelNode.getVariablesStopped().
-
- /**
- * Creates a Join.
- *
- * <p>Note: We plan to change the {@code variablesStopped} parameter to
- * {@code Set<CorrelationId> variablesSet}
- * {@link org.apache.calcite.util.Bug#upgrade(String) before version 2.0},
- * because {@link #getVariablesSet()}
- * is preferred over {@link #getVariablesStopped()}.
- * This constructor is not deprecated, for now, because maintaining overloaded
- * constructors in multiple sub-classes would be onerous.
- *
- * @param cluster Cluster
- * @param traitSet Trait set
- * @param left Left input
- * @param right Right input
- * @param condition Join condition
- * @param joinType Join type
- * @param variablesSet Set variables that are set by the
- * LHS and used by the RHS and are not available to
- * nodes above this Join in the tree
- */
- protected Join(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- RelNode left,
- RelNode right,
- RexNode condition,
- Set<CorrelationId> variablesSet,
- JoinRelType joinType) {
- super(cluster, traitSet, left, right);
- this.condition = Objects.requireNonNull(condition);
- this.variablesSet = ImmutableSet.copyOf(variablesSet);
- this.joinType = Objects.requireNonNull(joinType);
- this.joinInfo = JoinInfo.of(left, right, condition);
- }
-
- @Deprecated // to be removed before 2.0
- protected Join(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- RelNode left,
- RelNode right,
- RexNode condition,
- JoinRelType joinType,
- Set<String> variablesStopped) {
- this(cluster, traitSet, left, right, condition,
- CorrelationId.setOf(variablesStopped), joinType);
- }
-
- //~ Methods ----------------------------------------------------------------
-
- @Override public List<RexNode> getChildExps() {
- return ImmutableList.of(condition);
- }
-
- @Override public RelNode accept(RexShuttle shuttle) {
- RexNode condition = shuttle.apply(this.condition);
- if (this.condition == condition) {
- return this;
- }
- return copy(traitSet, condition, left, right, joinType, isSemiJoinDone());
- }
-
- public RexNode getCondition() {
- return condition;
- }
-
- public JoinRelType getJoinType() {
- return joinType;
- }
-
- @Override public boolean isValid(Litmus litmus, Context context) {
- if (!super.isValid(litmus, context)) {
- return false;
- }
- if (getRowType().getFieldCount()
- != getSystemFieldList().size()
- + left.getRowType().getFieldCount()
- + (joinType.projectsRight() ? right.getRowType().getFieldCount() : 0)) {
- return litmus.fail("field count mismatch");
- }
- if (condition != null) {
- if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
- return litmus.fail("condition must be boolean: {}",
- condition.getType());
- }
- // The input to the condition is a row type consisting of system
- // fields, left fields, and right fields. Very similar to the
- // output row type, except that fields have not yet been made due
- // due to outer joins.
- RexChecker checker =
- new RexChecker(
- getCluster().getTypeFactory().builder()
- .addAll(getSystemFieldList())
- .addAll(getLeft().getRowType().getFieldList())
- .addAll(getRight().getRowType().getFieldList())
- .build(),
- context, litmus);
- condition.accept(checker);
- if (checker.getFailureCount() > 0) {
- return litmus.fail(checker.getFailureCount()
- + " failures in condition " + condition);
- }
- }
- return litmus.succeed();
- }
-
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
- RelMetadataQuery mq) {
- // Maybe we should remove this for semi-join ?
- if (!joinType.projectsRight()) {
- // REVIEW jvs 9-Apr-2006: Just for now...
- return planner.getCostFactory().makeTinyCost();
- }
- double rowCount = mq.getRowCount(this);
- return planner.getCostFactory().makeCost(rowCount, 0, 0);
- }
-
- /** @deprecated Use {@link RelMdUtil#getJoinRowCount(RelMetadataQuery, Join, RexNode)}. */
- @Deprecated // to be removed before 2.0
- public static double estimateJoinedRows(
- Join joinRel,
- RexNode condition) {
- final RelMetadataQuery mq = RelMetadataQuery.instance();
- return Util.first(RelMdUtil.getJoinRowCount(mq, joinRel, condition), 1D);
- }
-
- @Override public double estimateRowCount(RelMetadataQuery mq) {
- return Util.first(RelMdUtil.getJoinRowCount(mq, this, condition), 1D);
- }
-
- @Override public Set<CorrelationId> getVariablesSet() {
- return variablesSet;
- }
-
- @Override public RelWriter explainTerms(RelWriter pw) {
- return super.explainTerms(pw)
- .item("condition", condition)
- .item("joinType", joinType.lowerName)
- .itemIf(
- "systemFields",
- getSystemFieldList(),
- !getSystemFieldList().isEmpty());
- }
-
- @Override protected RelDataType deriveRowType() {
- assert getSystemFieldList() != null;
- RelDataType leftType = left.getRowType();
- RelDataType rightType = right.getRowType();
- RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
- switch (joinType) {
- case LEFT:
- rightType = typeFactory.createTypeWithNullability(rightType, true);
- break;
- case RIGHT:
- leftType = typeFactory.createTypeWithNullability(leftType, true);
- break;
- case FULL:
- leftType = typeFactory.createTypeWithNullability(leftType, true);
- rightType = typeFactory.createTypeWithNullability(rightType, true);
- break;
- case SEMI:
- case ANTI:
- rightType = null;
- default:
- break;
- }
- return createJoinType(typeFactory, leftType, rightType, null, getSystemFieldList());
- }
-
- /**
- * Returns whether this LogicalJoin has already spawned a
- * {@code SemiJoin} via
- * {@link org.apache.calcite.rel.rules.JoinAddRedundantSemiJoinRule}.
- *
- * <p>The base implementation returns false.</p>
- *
- * @return whether this join has already spawned a semi join
- */
- public boolean isSemiJoinDone() {
- return false;
- }
-
- /**
- * Returns whether this Join is a semijoin.
- *
- * @return true if this Join's join type is semi.
- */
- public boolean isSemiJoin() {
- return joinType == JoinRelType.SEMI;
- }
-
- /**
- * Returns a list of system fields that will be prefixed to
- * output row type.
- *
- * @return list of system fields
- */
- public List<RelDataTypeField> getSystemFieldList() {
- return Collections.emptyList();
- }
-
- @Deprecated // to be removed before 2.0
- public static RelDataType deriveJoinRowType(
- RelDataType leftType,
- RelDataType rightType,
- JoinRelType joinType,
- RelDataTypeFactory typeFactory,
- List<String> fieldNameList,
- List<RelDataTypeField> systemFieldList) {
- return SqlValidatorUtil.deriveJoinRowType(leftType, rightType, joinType,
- typeFactory, fieldNameList, systemFieldList);
- }
-
- @Deprecated // to be removed before 2.0
- public static RelDataType createJoinType(
- RelDataTypeFactory typeFactory,
- RelDataType leftType,
- RelDataType rightType,
- List<String> fieldNameList,
- List<RelDataTypeField> systemFieldList) {
- return SqlValidatorUtil.createJoinType(typeFactory, leftType, rightType,
- fieldNameList, systemFieldList);
- }
-
- @Override public final Join copy(RelTraitSet traitSet, List<RelNode> inputs) {
- assert inputs.size() == 2;
- return copy(traitSet, getCondition(), inputs.get(0), inputs.get(1),
- joinType, isSemiJoinDone());
- }
-
- /**
- * Creates a copy of this join, overriding condition, system fields and
- * inputs.
- *
- * <p>General contract as {@link RelNode#copy}.
- *
- * @param traitSet Traits
- * @param conditionExpr Condition
- * @param left Left input
- * @param right Right input
- * @param joinType Join type
- * @param semiJoinDone Whether this join has been translated to a
- * semi-join
- * @return Copy of this join
- */
- public abstract Join copy(RelTraitSet traitSet, RexNode conditionExpr,
- RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone);
-
- /**
- * Analyzes the join condition.
- *
- * @return Analyzed join condition
- */
- public JoinInfo analyzeCondition() {
- return joinInfo;
- }
-}
-
-// End Join.java
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
deleted file mode 100644
index 8e69d15..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-import org.apache.calcite.linq4j.CorrelateJoinType;
-
-import java.util.Locale;
-
-/**
- * This class is copied from https://github.com/apache/calcite/pull/1157 to supports SEMI/ANTI join.
- * NOTES: This file should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- */
-
-/**
- * Enumeration of join types.
- */
-public enum JoinRelType {
- /**
- * Inner join.
- */
- INNER,
-
- /**
- * Left-outer join.
- */
- LEFT,
-
- /**
- * Right-outer join.
- */
- RIGHT,
-
- /**
- * Full-outer join.
- */
- FULL,
-
- /**
- * Semi-join.
- *
- * <p>For example, {@code EMP semi-join DEPT} finds all {@code EMP} records
- * that have a corresponding {@code DEPT} record:
- *
- * <blockquote><pre>
- * SELECT * FROM EMP
- * WHERE EXISTS (SELECT 1 FROM DEPT
- * WHERE DEPT.DEPTNO = EMP.DEPTNO)</pre>
- * </blockquote>
- */
- SEMI,
-
- /**
- * Anti-join.
- *
- * <p>For example, {@code EMP anti-join DEPT} finds all {@code EMP} records
- * that do not have a corresponding {@code DEPT} record:
- *
- * <blockquote><pre>
- * SELECT * FROM EMP
- * WHERE NOT EXISTS (SELECT 1 FROM DEPT
- * WHERE DEPT.DEPTNO = EMP.DEPTNO)</pre>
- * </blockquote>
- */
- ANTI;
-
- /** Lower-case name. */
- public final String lowerName = name().toLowerCase(Locale.ROOT);
-
- /**
- * Returns whether a join of this type may generate NULL values on the
- * right-hand side.
- */
- public boolean generatesNullsOnRight() {
- return (this == LEFT) || (this == FULL);
- }
-
- /**
- * Returns whether a join of this type may generate NULL values on the
- * left-hand side.
- */
- public boolean generatesNullsOnLeft() {
- return (this == RIGHT) || (this == FULL);
- }
-
- /**
- * Swaps left to right, and vice versa.
- */
- public JoinRelType swap() {
- switch (this) {
- case LEFT:
- return RIGHT;
- case RIGHT:
- return LEFT;
- default:
- return this;
- }
- }
-
- /** Returns whether this join type generates nulls on side #{@code i}. */
- public boolean generatesNullsOn(int i) {
- switch (i) {
- case 0:
- return generatesNullsOnLeft();
- case 1:
- return generatesNullsOnRight();
- default:
- throw new IllegalArgumentException("invalid: " + i);
- }
- }
-
- /** Returns a join type similar to this but that does not generate nulls on
- * the left. */
- public JoinRelType cancelNullsOnLeft() {
- switch (this) {
- case RIGHT:
- return INNER;
- case FULL:
- return LEFT;
- default:
- return this;
- }
- }
-
- /** Returns a join type similar to this but that does not generate nulls on
- * the right. */
- public JoinRelType cancelNullsOnRight() {
- switch (this) {
- case LEFT:
- return INNER;
- case FULL:
- return RIGHT;
- default:
- return this;
- }
- }
-
- /** Transform this JoinRelType to CorrelateJoinType. **/
- public CorrelateJoinType toLinq4j() {
- switch (this) {
- case INNER:
- return CorrelateJoinType.INNER;
- case LEFT:
- return CorrelateJoinType.LEFT;
- case SEMI:
- return CorrelateJoinType.SEMI;
- case ANTI:
- return CorrelateJoinType.ANTI;
- }
- throw new IllegalStateException(
- "Unable to convert " + this + " to CorrelateJoinType");
- }
-
- public boolean projectsRight() {
- switch (this) {
- case INNER:
- case LEFT:
- case RIGHT:
- case FULL:
- return true;
- case SEMI:
- case ANTI:
- return false;
- }
- throw new IllegalStateException(
- "Unable to convert " + this + " to JoinRelType");
- }
-}
-
-// End JoinRelType.java
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index cef2de0..d260514 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -79,7 +79,6 @@ import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexSubQuery;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.sql.SemiJoinType;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlFunction;
@@ -119,7 +118,7 @@ import java.util.TreeMap;
/**
* This class is copied from Apache Calcite except that it supports SEMI/ANTI join.
- * NOTES: This file should be deleted when upgrading to a new calcite version which contains CALCITE-2969.
+ * NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are fixed.
*/
/**
@@ -263,6 +262,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
// has been rewritten; apply rules post-decorrelation
final HepProgram program2 = HepProgram.builder()
.addRuleInstance(
+ // use FilterJoinRule instead of FlinkFilterJoinRule while CALCITE-3170 is fixed
new FlinkFilterJoinRule.FlinkFilterIntoJoinRule(
true, f,
FlinkFilterJoinRule.TRUE_PREDICATE))
@@ -474,8 +474,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
}
final RelNode newInput = frame.r;
+ // aggregate outputs mapping: group keys and aggregates
+ final Map<Integer, Integer> outputMap = new HashMap<>();
+
// map from newInput
- Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>();
+ final Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>();
final int oldGroupKeyCount = rel.getGroupSet().cardinality();
// Project projects the original expressions,
@@ -497,6 +500,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
omittedConstants.put(i, constant);
continue;
}
+
+ // add mapping of group keys.
+ outputMap.put(i, newPos);
int newInputPos = frame.oldToNewOutputs.get(i);
projects.add(RexInputRef.of2(newInputPos, newInputOutput));
mapNewInputToProjOutputs.put(newInputPos, newPos);
@@ -600,7 +606,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
// The old to new output position mapping will be the same as that
// of newProject, plus any aggregates that the oldAgg produces.
- combinedMap.put(
+ outputMap.put(
oldInputOutputFieldCount + i,
newInputOutputFieldCount + i);
}
@@ -612,15 +618,37 @@ public class RelDecorrelator implements ReflectiveVisitor {
final List<RexNode> postProjects = new ArrayList<>(relBuilder.fields());
for (Map.Entry<Integer, RexLiteral> entry
: omittedConstants.descendingMap().entrySet()) {
- postProjects.add(entry.getKey() + frame.corDefOutputs.size(),
- entry.getValue());
+ int index = entry.getKey() + frame.corDefOutputs.size();
+ postProjects.add(index, entry.getValue());
+ // Shift the outputs whose index equals with or bigger than the added index
+ // with 1 offset.
+ shiftMapping(outputMap, index, 1);
+ // Then add the constant key mapping.
+ outputMap.put(entry.getKey(), index);
}
relBuilder.project(postProjects);
}
// Aggregate does not change input ordering so corVars will be
// located at the same position as the input newProject.
- return register(rel, relBuilder.build(), combinedMap, corDefOutputs);
+ return register(rel, relBuilder.build(), outputMap, corDefOutputs);
+ }
+
+ /**
+ * Shift the mapping to fixed offset from the {@code startIndex}.
+ * @param mapping the original mapping
+ * @param startIndex any output whose index equals with or bigger than the starting index
+ * would be shift
+ * @param offset shift offset
+ */
+ private static void shiftMapping(Map<Integer, Integer> mapping, int startIndex, int offset) {
+ for (Map.Entry<Integer, Integer> entry : mapping.entrySet()) {
+ if (entry.getValue() >= startIndex) {
+ mapping.put(entry.getKey(), entry.getValue() + offset);
+ } else {
+ mapping.put(entry.getKey(), entry.getValue());
+ }
+ }
}
public Frame getInvoke(RelNode r, RelNode parent) {
@@ -1162,7 +1190,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
RexUtil.composeConjunction(relBuilder.getRexBuilder(), conditions);
RelNode newJoin =
LogicalJoin.create(leftFrame.r, rightFrame.r, condition,
- ImmutableSet.of(), toJoinRelType(rel.getJoinType()));
+ ImmutableSet.of(), rel.getJoinType());
return register(rel, newJoin, mapOldToNewOutputs, corDefOutputs);
}
@@ -1173,6 +1201,14 @@ public class RelDecorrelator implements ReflectiveVisitor {
* @param rel Join
*/
public Frame decorrelateRel(LogicalJoin rel) {
+ // For SEMI/ANTI join decorrelate it's input directly,
+ // because the correlate variables can only be propagated from
+ // the left side, which is not supported yet.
+ if (!rel.getJoinType().projectsRight()) {
+ // fix CALCITE-3169
+ return decorrelateRel((RelNode) rel);
+ }
+
//
// Rewrite logic:
//
@@ -1180,10 +1216,6 @@ public class RelDecorrelator implements ReflectiveVisitor {
// 2. map output positions and produce corVars if any.
//
- if (!rel.getJoinType().projectsRight()) {
- return decorrelateRel((RelNode) rel);
- }
-
final RelNode oldLeft = rel.getInput(0);
final RelNode oldRight = rel.getInput(1);
@@ -1335,21 +1367,6 @@ public class RelDecorrelator implements ReflectiveVisitor {
.build();
}
- private JoinRelType toJoinRelType(SemiJoinType semiJoinType) {
- switch (semiJoinType) {
- case INNER:
- return JoinRelType.INNER;
- case LEFT:
- return JoinRelType.LEFT;
- case SEMI:
- return JoinRelType.SEMI;
- case ANTI:
- return JoinRelType.ANTI;
- default:
- throw new IllegalArgumentException("Unsupported type: " + semiJoinType);
- }
- }
-
/**
* Pulls a {@link Project} above a {@link Correlate} from its RHS input.
* Enforces nullability for join output.
@@ -1365,7 +1382,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
LogicalProject project,
Set<Integer> isCount) {
final RelNode left = correlate.getLeft();
- final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+ final JoinRelType joinType = correlate.getJoinType();
// now create the new project
final List<Pair<RexNode, String>> newProjects = new ArrayList<>();
@@ -1867,7 +1884,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
// Aggregate (groupby (0) single_value())
// Project-A (may reference corVar)
// rightInput
- final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+ final JoinRelType joinType = correlate.getJoinType();
// corRel.getCondition was here, however Correlate was updated so it
// never includes a join condition. The code was not modified for brevity.
@@ -2077,7 +2094,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
return;
}
- final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+ final JoinRelType joinType = correlate.getJoinType();
// corRel.getCondition was here, however Correlate was updated so it
// never includes a join condition. The code was not modified for brevity.
RexNode joinCond = rexBuilder.makeLiteral(true);
@@ -2482,7 +2499,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
return;
}
- JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+ JoinRelType joinType = correlate.getJoinType();
// corRel.getCondition was here, however Correlate was updated so it
// never includes a join condition. The code was not modified for brevity.
RexNode joinCond = relBuilder.literal(true);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
index bd8306f..dc5b918 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
@@ -33,6 +33,7 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
import java.util.List;
import java.util.Optional;
@@ -58,7 +59,8 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
SqlIdentifier opName,
SqlFunctionCategory category,
SqlSyntax syntax,
- List<SqlOperator> operatorList) {
+ List<SqlOperator> operatorList,
+ SqlNameMatcher nameMatcher) {
if (!opName.isSimple()) {
return;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index 12af928..d3d2e5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -29,9 +29,11 @@ import org.apache.calcite.sql.SqlBinaryOperator;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlGroupedWindowFunction;
+import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
@@ -42,6 +44,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeTransforms;
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlNameMatchers;
import java.util.Arrays;
import java.util.List;
@@ -89,6 +93,17 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
});
}
+ @Override
+ public void lookupOperatorOverloads(
+ SqlIdentifier opName,
+ SqlFunctionCategory category,
+ SqlSyntax syntax,
+ List<SqlOperator> operatorList,
+ SqlNameMatcher nameMatcher) {
+ // set caseSensitive=false to make sure the behavior is same with before.
+ super.lookupOperatorOverloads(opName, category, syntax, operatorList, SqlNameMatchers.withCaseSensitive(false));
+ }
+
// -----------------------------------------------------------------------------
// Flink specific built-in scalar SQL functions
// -----------------------------------------------------------------------------
@@ -1131,7 +1146,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction CURRENT_TIMESTAMP = SqlStdOperatorTable.CURRENT_TIMESTAMP;
public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE;
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
- public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
public static final SqlOperator SCALAR_QUERY = SqlStdOperatorTable.SCALAR_QUERY;
public static final SqlOperator EXISTS = SqlStdOperatorTable.EXISTS;
public static final SqlFunction SIN = SqlStdOperatorTable.SIN;
@@ -1148,9 +1162,21 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction PI = SqlStdOperatorTable.PI;
public static final SqlFunction RAND = SqlStdOperatorTable.RAND;
public static final SqlFunction RAND_INTEGER = SqlStdOperatorTable.RAND_INTEGER;
+ public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
+
+ // TIME FUNCTIONS
+ public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR;
+ public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
+ public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH;
+ public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK;
+ public static final SqlFunction HOUR = SqlStdOperatorTable.HOUR;
+ public static final SqlFunction MINUTE = SqlStdOperatorTable.MINUTE;
+ public static final SqlFunction SECOND = SqlStdOperatorTable.SECOND;
+ public static final SqlFunction DAYOFYEAR = SqlStdOperatorTable.DAYOFYEAR;
+ public static final SqlFunction DAYOFMONTH = SqlStdOperatorTable.DAYOFMONTH;
+ public static final SqlFunction DAYOFWEEK = SqlStdOperatorTable.DAYOFWEEK;
public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
public static final SqlFunction TIMESTAMP_DIFF = SqlStdOperatorTable.TIMESTAMP_DIFF;
- public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
// MATCH_RECOGNIZE
public static final SqlFunction FIRST = SqlStdOperatorTable.FIRST;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
index 3b593de..3513ba7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.plan.rules.logical;
-import org.apache.flink.table.plan.util.FlinkRelOptUtil;
-
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
@@ -42,12 +40,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import static org.apache.calcite.plan.RelOptUtil.conjunctions;
-
/**
* This rules is copied from Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule}.
+ * NOTES: This file should be deleted when CALCITE-3170 is fixed.
* Modification:
- * - Use `FlinkRelOptUtil.classifyFilters` to support SEMI/ANTI join
* - Handles the ON condition of anti-join can not be pushed down
*/
@@ -140,7 +136,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
final List<RexNode> aboveFilters =
filter != null
- ? conjunctions(filter.getCondition())
+ ? RelOptUtil.conjunctions(filter.getCondition())
: new ArrayList<>();
final com.google.common.collect.ImmutableList<RexNode> origAboveFilters =
com.google.common.collect.ImmutableList.copyOf(aboveFilters);
@@ -150,7 +146,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
if (smart
&& !origAboveFilters.isEmpty()
&& join.getJoinType() != JoinRelType.INNER) {
- joinType = FlinkRelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
+ joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
}
final List<RexNode> leftFilters = new ArrayList<>();
@@ -166,7 +162,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
// filters. They can be pushed down if they are not on the NULL
// generating side.
boolean filterPushed = false;
- if (FlinkRelOptUtil.classifyFilters(
+ if (RelOptUtil.classifyFilters(
join,
aboveFilters,
joinType,
@@ -198,7 +194,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
// pushed down if it does not affect the non-matching set, i.e. it is
// not on the side which is preserved.
// A ON clause filter of anti-join can not be pushed down.
- if (!isAntiJoin && FlinkRelOptUtil.classifyFilters(
+ if (!isAntiJoin && RelOptUtil.classifyFilters(
join,
joinFilters,
joinType,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
index a404e3f..2847a41 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
@@ -29,7 +29,6 @@ import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
-import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Project;
@@ -563,10 +562,6 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
* @param rel Aggregate to rewrite
*/
public Frame decorrelateRel(LogicalAggregate rel) {
- if (rel.getGroupType() != Aggregate.Group.SIMPLE) {
- throw new AssertionError(Bug.CALCITE_461_FIXED);
- }
-
// Aggregate itself should not reference corVars.
assert !cm.mapRefRelToCorRef.containsKey(rel);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
index c60bb12..11b0681 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
@@ -32,8 +32,8 @@ import org.apache.calcite.rel.core._
import org.apache.calcite.rel.logical._
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.SqlKind.{EXCEPT, INTERSECT, UNION}
-import org.apache.calcite.sql.{SemiJoinType, SqlKind}
import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
import org.apache.calcite.util.ImmutableBitSet
@@ -139,14 +139,12 @@ object FlinkLogicalRelFactories {
* Implementation of [[AggregateFactory]] that returns a [[FlinkLogicalAggregate]].
*/
class AggregateFactoryImpl extends AggregateFactory {
- @SuppressWarnings(Array("deprecation"))
def createAggregate(
input: RelNode,
- indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: ImmutableList[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): RelNode = {
- FlinkLogicalAggregate.create(input, indicator, groupSet, groupSets, aggCalls)
+ FlinkLogicalAggregate.create(input, groupSet, groupSets, aggCalls)
}
}
@@ -206,7 +204,7 @@ object FlinkLogicalRelFactories {
right: RelNode,
correlationId: CorrelationId,
requiredColumns: ImmutableBitSet,
- joinType: SemiJoinType): RelNode = {
+ joinType: JoinRelType): RelNode = {
FlinkLogicalCorrelate.create(left, right, correlationId, requiredColumns, joinType)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
index 7091846..89f0a4c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
@@ -42,8 +42,8 @@ import org.apache.flink.table.types.{DataType, PlannerTypeUtils}
import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rex._
-import org.apache.calcite.sql.SemiJoinType
import scala.collection.JavaConversions._
@@ -58,7 +58,7 @@ object CorrelateCodeGenerator {
scan: FlinkLogicalTableFunctionScan,
condition: Option[RexNode],
outDataType: RelDataType,
- joinType: SemiJoinType,
+ joinType: JoinRelType,
parallelism: Int,
retainHeader: Boolean,
expression: (RexNode, List[String], Option[List[RexNode]]) => String,
@@ -153,7 +153,7 @@ object CorrelateCodeGenerator {
swallowInputOnly: Boolean = false,
udtfType: LogicalType,
returnType: RowType,
- joinType: SemiJoinType,
+ joinType: JoinRelType,
rexCall: RexCall,
pojoFieldMapping: Option[Array[Int]],
ruleDescription: String,
@@ -192,7 +192,7 @@ object CorrelateCodeGenerator {
|""".stripMargin
// 3. left join
- if (joinType == SemiJoinType.LEFT) {
+ if (joinType == JoinRelType.LEFT) {
if (swallowInputOnly) {
// and the returned row table function is empty, collect a null
val nullRowTerm = CodeGenUtils.newName("nullRow")
@@ -267,8 +267,8 @@ object CorrelateCodeGenerator {
|""".stripMargin
}
- } else if (joinType != SemiJoinType.INNER) {
- throw new TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+ } else if (joinType != JoinRelType.INNER) {
+ throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.")
}
val genOperator = OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, BaseRow](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 7993d67..0e5dcd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -40,7 +40,6 @@ import org.apache.calcite.rel.core._
import org.apache.calcite.rel.metadata._
import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SemiJoinType._
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
@@ -583,8 +582,9 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = {
rel.getJoinType match {
- case ANTI | SEMI => mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
- case LEFT | INNER =>
+ case JoinRelType.ANTI | JoinRelType.SEMI =>
+ mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
+ case JoinRelType.LEFT | JoinRelType.INNER =>
val left = rel.getLeft
val right = rel.getRight
val leftFieldCount = left.getRowType.getFieldCount
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
index be8288e..b96faf4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
@@ -42,7 +42,6 @@ final class LogicalWindowAggregate(
override def copy(
traitSet: RelTraitSet,
input: RelNode,
- indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): Aggregate = {
@@ -74,7 +73,7 @@ object LogicalWindowAggregate {
window: LogicalWindow,
namedProperties: Seq[PlannerNamedWindowProperty],
agg: Aggregate): LogicalWindowAggregate = {
- require(!agg.indicator && (agg.getGroupType == Group.SIMPLE))
+ require(agg.getGroupType == Group.SIMPLE)
val cluster: RelOptCluster = agg.getCluster
val traitSet: RelTraitSet = cluster.traitSetOf(Convention.NONE)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
index e206b65..433590b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
@@ -48,7 +48,6 @@ abstract class WindowAggregate(
cluster,
traitSet,
child,
- false,
groupSet,
ImmutableList.of(groupSet),
aggCalls) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
index 65fec34..4a0c303 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
@@ -64,11 +64,10 @@ abstract class CommonPhysicalJoin(
lazy val inputRowType: RelDataType = joinType match {
case JoinRelType.SEMI | JoinRelType.ANTI =>
// Combines inputs' RowType, the result is different from SEMI/ANTI Join's RowType.
- SqlValidatorUtil.deriveJoinRowType(
+ SqlValidatorUtil.createJoinType(
+ getCluster.getTypeFactory,
getLeft.getRowType,
getRight.getRowType,
- getJoinType,
- getCluster.getTypeFactory,
null,
Collections.emptyList[RelDataTypeField]
)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
index 70fc662..52d3c41 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -43,13 +43,12 @@ class FlinkLogicalAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
child: RelNode,
- indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall],
/* flag indicating whether to skip SplitAggregateRule */
var partialFinalType: PartialFinalType = PartialFinalType.NONE)
- extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls)
+ extends Aggregate(cluster, traitSet, child, groupSet, groupSets, aggCalls)
with FlinkLogicalRel {
def setPartialFinalType(partialFinalType: PartialFinalType): Unit = {
@@ -59,12 +58,11 @@ class FlinkLogicalAggregate(
override def copy(
traitSet: RelTraitSet,
input: RelNode,
- indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): Aggregate = {
new FlinkLogicalAggregate(
- cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls, partialFinalType)
+ cluster, traitSet, input, groupSet, groupSets, aggCalls, partialFinalType)
}
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
@@ -113,7 +111,6 @@ private class FlinkLogicalAggregateBatchConverter
val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
FlinkLogicalAggregate.create(
newInput,
- agg.indicator,
agg.getGroupSet,
agg.getGroupSets,
agg.getAggCallList)
@@ -143,7 +140,6 @@ private class FlinkLogicalAggregateStreamConverter
val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
FlinkLogicalAggregate.create(
newInput,
- agg.indicator,
agg.getGroupSet,
agg.getGroupSets,
agg.getAggCallList)
@@ -156,12 +152,11 @@ object FlinkLogicalAggregate {
def create(
input: RelNode,
- indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
val cluster = input.getCluster
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
- new FlinkLogicalAggregate(cluster,traitSet, input, indicator, groupSet, groupSets, aggCalls)
+ new FlinkLogicalAggregate(cluster,traitSet, input, groupSet, groupSets, aggCalls)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
index 54bf96c..3c06542 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
@@ -23,9 +23,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.{Correlate, CorrelationId}
+import org.apache.calcite.rel.core.{Correlate, CorrelationId, JoinRelType}
import org.apache.calcite.rel.logical.LogicalCorrelate
-import org.apache.calcite.sql.SemiJoinType
import org.apache.calcite.util.ImmutableBitSet
/**
@@ -39,7 +38,7 @@ class FlinkLogicalCorrelate(
right: RelNode,
correlationId: CorrelationId,
requiredColumns: ImmutableBitSet,
- joinType: SemiJoinType)
+ joinType: JoinRelType)
extends Correlate(cluster, traitSet, left, right, correlationId, requiredColumns, joinType)
with FlinkLogicalRel {
@@ -49,7 +48,7 @@ class FlinkLogicalCorrelate(
right: RelNode,
correlationId: CorrelationId,
requiredColumns: ImmutableBitSet,
- joinType: SemiJoinType): Correlate = {
+ joinType: JoinRelType): Correlate = {
new FlinkLogicalCorrelate(
cluster,
@@ -91,7 +90,7 @@ object FlinkLogicalCorrelate {
right: RelNode,
correlationId: CorrelationId,
requiredColumns: ImmutableBitSet,
- joinType: SemiJoinType): FlinkLogicalCorrelate = {
+ joinType: JoinRelType): FlinkLogicalCorrelate = {
val cluster = left.getCluster
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
new FlinkLogicalCorrelate(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index 2b96e44..41074bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -18,20 +18,20 @@
package org.apache.flink.table.plan.nodes.logical
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.FlinkConventions
+
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rel.metadata.RelColumnMapping
import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode, RexUtil}
-import org.apache.calcite.sql.SemiJoinType
import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.table.functions.TemporalTableFunction
-import org.apache.flink.table.functions.utils.TableSqlFunction
import java.lang.reflect.Type
import java.util
@@ -162,7 +162,7 @@ class FlinkLogicalTableFunctionScanConverter
newScan,
cluster.createCorrel(), // a dummy CorrelationId
ImmutableBitSet.of(),
- SemiJoinType.INNER)
+ JoinRelType.INNER)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index ebea49c..383cdd1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -50,7 +50,6 @@ class FlinkLogicalWindowAggregate(
override def copy(
traitSet: RelTraitSet,
input: RelNode,
- indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): Aggregate = {
@@ -99,7 +98,7 @@ class FlinkLogicalWindowAggregateConverter
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalWindowAggregate]
- require(!agg.indicator && (agg.getGroupType == Group.SIMPLE))
+ require(agg.getGroupType == Group.SIMPLE)
val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
val traitSet = newInput.getCluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
index cd792fa..8e3c759 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -30,10 +30,10 @@ import org.apache.flink.table.planner.BatchPlanner
import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Correlate
+import org.apache.calcite.rel.core.{Correlate, JoinRelType}
import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, RelFieldCollation, RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
-import org.apache.calcite.sql.{SemiJoinType, SqlKind}
+import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
import java.util
@@ -51,12 +51,12 @@ class BatchExecCorrelate(
condition: Option[RexNode],
projectProgram: Option[RexProgram],
outputRowType: RelDataType,
- joinType: SemiJoinType)
+ joinType: JoinRelType)
extends SingleRel(cluster, traitSet, inputRel)
with BatchPhysicalRel
with BatchExecNode[BaseRow] {
- require(joinType == SemiJoinType.INNER || joinType == SemiJoinType.LEFT)
+ require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
override def deriveRowType(): RelDataType = outputRowType
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
index 9b28572..d3324ac 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
@@ -29,9 +29,9 @@ import org.apache.flink.table.runtime.AbstractProcessStreamOperator
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
-import org.apache.calcite.sql.SemiJoinType
import java.util
@@ -48,12 +48,12 @@ class StreamExecCorrelate(
scan: FlinkLogicalTableFunctionScan,
condition: Option[RexNode],
outputRowType: RelDataType,
- joinType: SemiJoinType)
+ joinType: JoinRelType)
extends SingleRel(cluster, traitSet, inputRel)
with StreamPhysicalRel
with StreamExecNode[BaseRow] {
- require(joinType == SemiJoinType.INNER || joinType == SemiJoinType.LEFT)
+ require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
override def producesUpdates: Boolean = false
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 96da7b4..edc15e3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -74,7 +74,6 @@ object FlinkBatchProgram {
)
// rewrite special temporal join plan
- // TODO remove this program after upgraded to CALCITE-1.20.0 (CALCITE-2004 is fixed)
chainedProgram.addLast(
TEMPORAL_JOIN_REWRITE,
FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index 1020a69..7fac4cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -74,7 +74,6 @@ object FlinkStreamProgram {
.build())
// rewrite special temporal join plan
- // TODO remove this program after upgraded to CALCITE-1.20.0 (CALCITE-2004 is fixed)
chainedProgram.addLast(
TEMPORAL_JOIN_REWRITE,
FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 27fc011..38f677e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -17,10 +17,11 @@
*/
package org.apache.flink.table.plan.rules.logical
-import org.apache.calcite.plan.RelOptRule.{any, operand, some}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalSnapshot}
+import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, RexNode, RexShuttle}
/**
* The initial temporal table join (FOR SYSTEM_TIME AS OF) is a Correlate, rewrite it into a Join
@@ -28,29 +29,51 @@ import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalS
* [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecLookupJoin]] in physical and
* might be translated into
* [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalJoin]] in the future.
+ *
+ * TODO supports `true` join condition
*/
class LogicalCorrelateToJoinFromTemporalTableRule
extends RelOptRule(
- operand(classOf[LogicalFilter],
- operand(classOf[LogicalCorrelate], some(
- operand(classOf[RelNode], any()),
- operand(classOf[LogicalSnapshot], any())))),
+ operand(classOf[LogicalCorrelate],
+ operand(classOf[RelNode], any()),
+ operand(classOf[LogicalFilter],
+ operand(classOf[LogicalSnapshot], any()))),
"LogicalCorrelateToJoinFromTemporalTableRule") {
override def onMatch(call: RelOptRuleCall): Unit = {
- val filterOnCorrelate: LogicalFilter = call.rel(0)
- val correlate: LogicalCorrelate = call.rel(1)
- val leftNode: RelNode = call.rel(2)
+ val correlate: LogicalCorrelate = call.rel(0)
+ val leftInput: RelNode = call.rel(1)
+ val filter: LogicalFilter = call.rel(2)
val snapshot: LogicalSnapshot = call.rel[LogicalSnapshot](3)
+ val leftRowType = leftInput.getRowType
+ val condition = filter.getCondition
+ val joinCondition = condition.accept(new RexShuttle() {
+ // change correlate variable expression to normal RexInputRef (which is from left side)
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+ fieldAccess.getReferenceExpr match {
+ case corVar: RexCorrelVariable =>
+ require(correlate.getCorrelationId.equals(corVar.id))
+ val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+ RexInputRef.of(index, leftRowType)
+ case _ => super.visitFieldAccess(fieldAccess)
+ }
+ }
+
+ // update the field index from right side
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+ new RexInputRef(rightIndex, inputRef.getType)
+ }
+ })
+
val builder = call.builder()
- builder.push(leftNode)
+ builder.push(leftInput)
builder.push(snapshot)
- builder.join(
- correlate.getJoinType.toJoinType,
- filterOnCorrelate.getCondition)
+ builder.join(correlate.getJoinType, joinCondition)
- call.transformTo(builder.build())
+ val rel = builder.build()
+ call.transformTo(rel)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
index 5b6456b..cf748f1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
@@ -36,8 +36,8 @@ class StreamExecCorrelateRule
"StreamExecCorrelateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
- val join: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
- val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+ val correlate: FlinkLogicalCorrelate = call.rel(0)
+ val right = correlate.getRight.asInstanceOf[RelSubset].getOriginal
// find only calc and table function
def findTableFunction(calc: FlinkLogicalCalc): Boolean = {
@@ -59,10 +59,11 @@ class StreamExecCorrelateRule
}
override def convert(rel: RelNode): RelNode = {
- val join: FlinkLogicalCorrelate = rel.asInstanceOf[FlinkLogicalCorrelate]
+ val correlate = rel.asInstanceOf[FlinkLogicalCorrelate]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- val convInput: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.STREAM_PHYSICAL)
- val right: RelNode = join.getInput(1)
+ val convInput: RelNode = RelOptRule.convert(
+ correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL)
+ val right: RelNode = correlate.getInput(1)
def getTableScan(calc: FlinkLogicalCalc): RelNode = {
@@ -115,7 +116,7 @@ class StreamExecCorrelateRule
scan,
condition,
rel.getRowType,
- join.getJoinType)
+ correlate.getJoinType)
}
}
convertToCorrelate(right, None)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
index 1ac0f30..de71cfc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
@@ -23,10 +23,9 @@ import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
import org.apache.flink.table.plan.metadata.SelectivityEstimator
import org.apache.flink.table.{JBoolean, JByte, JDouble, JFloat, JLong, JShort}
-import com.google.common.collect.{ImmutableList, Lists}
+import com.google.common.collect.Lists
import org.apache.calcite.config.NullCollation
-import org.apache.calcite.plan.RelOptUtil.InputFinder
-import org.apache.calcite.plan.{RelOptUtil, Strong}
+import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection}
import org.apache.calcite.rel.`type`.RelDataTypeField
import org.apache.calcite.rel.core.{Join, JoinRelType}
@@ -189,196 +188,9 @@ object FlinkRelOptUtil {
}
/**
- * Simplifies outer joins if filter above would reject nulls.
- *
- * NOTES: This method should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- *
- * @param joinRel Join
- * @param aboveFilters Filters from above
- * @param joinType Join type, can not be inner join
- */
- def simplifyJoin(
- joinRel: RelNode,
- aboveFilters: ImmutableList[RexNode],
- joinType: JoinRelType): JoinRelType = {
- // No need to simplify if only first input output.
- if (!joinType.projectsRight()) {
- return joinType
- }
- val nTotalFields = joinRel.getRowType.getFieldCount
- val nSysFields = 0
- val nFieldsLeft = joinRel.getInputs.get(0).getRowType.getFieldCount
- val nFieldsRight = joinRel.getInputs.get(1).getRowType.getFieldCount
- assert(nTotalFields == nSysFields + nFieldsLeft + nFieldsRight)
-
- // set the reference bitmaps for the left and right children
- val leftBitmap = ImmutableBitSet.range(nSysFields, nSysFields + nFieldsLeft)
- val rightBitmap = ImmutableBitSet.range(nSysFields + nFieldsLeft, nTotalFields)
-
- var result = joinType
- for (filter <- aboveFilters) {
- if (joinType.generatesNullsOnLeft && Strong.isNotTrue(filter, leftBitmap)) {
- result = result.cancelNullsOnLeft
- }
- if (joinType.generatesNullsOnRight && Strong.isNotTrue(filter, rightBitmap)) {
- result = result.cancelNullsOnRight
- }
- if (joinType eq JoinRelType.INNER) {
- return result
- }
- }
- result
- }
-
- /**
- * Classifies filters according to where they should be processed. They
- * either stay where they are, are pushed to the join (if they originated
- * from above the join), or are pushed to one of the children. Filters that
- * are pushed are added to list passed in as input parameters.
- *
- * NOTES: This method should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- *
- * @param joinRel join node
- * @param filters filters to be classified
- * @param joinType join type
- * @param pushInto whether filters can be pushed into the ON clause
- * @param pushLeft true if filters can be pushed to the left
- * @param pushRight true if filters can be pushed to the right
- * @param joinFilters list of filters to push to the join
- * @param leftFilters list of filters to push to the left child
- * @param rightFilters list of filters to push to the right child
- * @return whether at least one filter was pushed
- */
- def classifyFilters(
- joinRel: RelNode,
- filters: util.List[RexNode],
- joinType: JoinRelType,
- pushInto: Boolean,
- pushLeft: Boolean,
- pushRight: Boolean,
- joinFilters: util.List[RexNode],
- leftFilters: util.List[RexNode],
- rightFilters: util.List[RexNode]): Boolean = {
- val rexBuilder = joinRel.getCluster.getRexBuilder
- val joinFields = joinRel.getRowType.getFieldList
- val nTotalFields = joinFields.size
- val nSysFields = 0 // joinRel.getSystemFieldList().size();
- val leftFields = joinRel.getInputs.get(0).getRowType.getFieldList
- val nFieldsLeft = leftFields.size
- val rightFields = joinRel.getInputs.get(1).getRowType.getFieldList
- val nFieldsRight = rightFields.size
-
- assert(nTotalFields == (if (joinType.projectsRight()) {
- nSysFields + nFieldsLeft + nFieldsRight
- } else {
- // SEMI/ANTI
- nSysFields + nFieldsLeft
- }))
-
- // set the reference bitmaps for the left and right children
- val leftBitmap = ImmutableBitSet.range(nSysFields, nSysFields + nFieldsLeft)
- val rightBitmap = ImmutableBitSet.range(nSysFields + nFieldsLeft, nTotalFields)
-
- val filtersToRemove = new util.ArrayList[RexNode]
-
- filters.foreach { filter =>
- val inputFinder = InputFinder.analyze(filter)
- val inputBits = inputFinder.inputBitSet.build
- // REVIEW - are there any expressions that need special handling
- // and therefore cannot be pushed?
- // filters can be pushed to the left child if the left child
- // does not generate NULLs and the only columns referenced in
- // the filter originate from the left child
- if (pushLeft && leftBitmap.contains(inputBits)) {
- // ignore filters that always evaluate to true
- if (!filter.isAlwaysTrue) {
- // adjust the field references in the filter to reflect
- // that fields in the left now shift over by the number
- // of system fields
- val shiftedFilter = shiftFilter(
- nSysFields,
- nSysFields + nFieldsLeft,
- -nSysFields,
- rexBuilder,
- joinFields,
- nTotalFields,
- leftFields,
- filter)
- leftFilters.add(shiftedFilter)
- }
- filtersToRemove.add(filter)
-
- // filters can be pushed to the right child if the right child
- // does not generate NULLs and the only columns referenced in
- // the filter originate from the right child
- } else if (pushRight && rightBitmap.contains(inputBits)) {
- if (!filter.isAlwaysTrue) {
- // that fields in the right now shift over to the left;
- // since we never push filters to a NULL generating
- // child, the types of the source should match the dest
- // so we don't need to explicitly pass the destination
- // fields to RexInputConverter
- val shiftedFilter = shiftFilter(
- nSysFields + nFieldsLeft,
- nTotalFields,
- -(nSysFields + nFieldsLeft),
- rexBuilder,
- joinFields,
- nTotalFields,
- rightFields,
- filter)
- rightFilters.add(shiftedFilter)
- }
- filtersToRemove.add(filter)
- } else {
- // If the filter can't be pushed to either child and the join
- // is an inner join, push them to the join if they originated
- // from above the join
- if ((joinType eq JoinRelType.INNER) && pushInto) {
- if (!joinFilters.contains(filter)) {
- joinFilters.add(filter)
- }
- filtersToRemove.add(filter)
- }
- }
- }
- // Remove filters after the loop, to prevent concurrent modification.
- if (!filtersToRemove.isEmpty) {
- filters.removeAll(filtersToRemove)
- }
- // Did anything change?
- !filtersToRemove.isEmpty
- }
-
- private def shiftFilter(
- start: Int,
- end: Int,
- offset: Int,
- rexBuilder: RexBuilder,
- joinFields: util.List[RelDataTypeField],
- nTotalFields: Int,
- rightFields: util.List[RelDataTypeField],
- filter: RexNode): RexNode = {
- val adjustments = new Array[Int](nTotalFields)
- (start until end).foreach {
- i => adjustments(i) = offset
- }
- filter.accept(
- new RelOptUtil.RexInputConverter(
- rexBuilder,
- joinFields,
- rightFields,
- adjustments)
- )
- }
-
- /**
* Pushes down expressions in "equal" join condition.
*
- * NOTES: This method should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
+ * NOTES: This method should be deleted when CALCITE-3171 is fixed.
*
* <p>For example, given
* "emp JOIN dept ON emp.deptno + 1 = dept.deptno", adds a project above
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
index a6746ff..c514ad8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
@@ -32,7 +32,7 @@ LogicalIntersect(all=[false])
<Resource name="planAfter">
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[c], select=[c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
++- HashJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], build=[left])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[c])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -127,7 +127,7 @@ LogicalMinus(all=[false])
<Resource name="planAfter">
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[c], select=[c])
-+- HashJoin(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
++- HashJoin(joinType=[LeftAntiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], build=[left])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[c])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
index b0e8f8b..141cbdf 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
@@ -1090,11 +1090,11 @@ LogicalIntersect(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
:- SortAggregate(isMerge=[false], groupBy=[random], select=[random])
: +- Sort(orderBy=[random ASC])
: +- Exchange(distribution=[hash[random]])
-: +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+: +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
: :- Exchange(distribution=[any], exchange_mode=[BATCH])
: : +- Calc(select=[random], reuse_id=[1])
: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index e281fb4..0377621 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -38,11 +38,11 @@ GROUP BY b
LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
+- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[>($7, 10)])
- +- LogicalFilter(condition=[=($1, $5)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
- :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
- : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+ :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+ : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -69,10 +69,10 @@ HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_COUNT(count$0) AS EX
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
-+- LogicalFilter(condition=[=($0, $4)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -98,10 +98,10 @@ WHERE T.c > 1000
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
+- LogicalFilter(condition=[>($2, 1000)])
- +- LogicalFilter(condition=[AND(=($0, $4), =($6, 10))])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+ +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -122,12 +122,12 @@ Calc(select=[a, b, c, proctime, id, name, CAST(10) AS age])
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalFilter(condition=[=($0, $3)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
- :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
- : +- LogicalFilter(condition=[>($2, 1000)])
- : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
+ :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+ : +- LogicalFilter(condition=[>($2, 1000)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -147,10 +147,10 @@ LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
-+- LogicalFilter(condition=[=($0, $4)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3}])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -175,10 +175,10 @@ ON T.a = D.id
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4])
-+- LogicalFilter(condition=[=($0, $4)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -213,11 +213,11 @@ GROUP BY b
LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
+- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[>($7, 10)])
- +- LogicalFilter(condition=[=($1, $5)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
- :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
- : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+ :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+ : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -274,20 +274,20 @@ LogicalProject(EXPR$0=[$2], EXPR$1=[$3], EXPR$2=[$4])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3], proctime=[$4], id=[$5], name=[$6], age=[$7])
: +- LogicalFilter(condition=[>($7, 10)])
- : +- LogicalFilter(condition=[=($1, $5)])
- : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
- : :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
- : : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
- : : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+ : :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+ : : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ : +- LogicalFilter(condition=[=($cor0.a, $0)])
: +- LogicalSnapshot(period=[$cor0.proctime])
: +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+- LogicalProject(a=[$5], b=[$0])
+- LogicalFilter(condition=[>($7, 10)])
- +- LogicalFilter(condition=[=($1, $5)])
- +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{4}])
- :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
- : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1, 4}])
+ :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+ : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalFilter(condition=[=($cor1.a, $0)])
+- LogicalSnapshot(period=[$cor1.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
index 1c3b540..e1e5afd 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
@@ -32,7 +32,7 @@ LogicalIntersect(all=[false])
<Resource name="planAfter">
<![CDATA[
LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
:- LogicalProject(c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(f=[$2])
@@ -57,7 +57,7 @@ LogicalIntersect(all=[false])
<Resource name="planAfter">
<![CDATA[
LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
:- LogicalProject(c=[$2])
: +- LogicalFilter(condition=[=(1, 0)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
@@ -83,7 +83,7 @@ LogicalIntersect(all=[false])
<Resource name="planAfter">
<![CDATA[
LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
:- LogicalProject(c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(f=[$2])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
index f000d5c..4470529 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
@@ -32,7 +32,7 @@ LogicalMinus(all=[false])
<Resource name="planAfter">
<![CDATA[
LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
:- LogicalProject(c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(f=[$2])
@@ -57,7 +57,7 @@ LogicalMinus(all=[false])
<Resource name="planAfter">
<![CDATA[
LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
:- LogicalProject(c=[$2])
: +- LogicalFilter(condition=[=(1, 0)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
@@ -83,7 +83,7 @@ LogicalMinus(all=[false])
<Resource name="planAfter">
<![CDATA[
LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
:- LogicalProject(c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(f=[$2])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml
new file mode 100644
index 0000000..9316828
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testWithProjectProjectCorrelate">
+ <Resource name="sql">
+ <![CDATA[
+SELECT (SELECT min(t1.t1d) FROM t3 WHERE t3.t3a = 'test') min_t1d
+FROM t1
+WHERE t1a = 'test'
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(min_t1d=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)])
+ LogicalProject($f0=[$cor0.t1d])
+ LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+ LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+})])
++- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(min_t1d=[$10])
++- LogicalJoin(condition=[=($3, $9)], joinType=[left])
+ :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+ +- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)])
+ +- LogicalProject(t1d=[$9], $f0=[$9])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+ +- LogicalAggregate(group=[{0}])
+ +- LogicalProject(t1d=[$3])
+ +- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWithProjectFilterCorrelate">
+ <Resource name="sql">
+ <![CDATA[
+SELECT (SELECT min(t3d) FROM t3 WHERE t3.t3a = t1.t1a) min_t3d,
+ (SELECT max(t2h) FROM t2 WHERE t2.t2a = t1.t1a) max_t2h
+FROM t1
+ WHERE t1a = 'test'
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(min_t3d=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)])
+ LogicalProject(t3d=[$3])
+ LogicalFilter(condition=[=($0, $cor0.t1a)])
+ LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+})], max_t2h=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)])
+ LogicalProject(t2h=[$7])
+ LogicalFilter(condition=[=($0, $cor1.t1a)])
+ LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+})])
++- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(min_t3d=[$9], max_t2h=[$11])
++- LogicalJoin(condition=[=($0, $10)], joinType=[left])
+ :- LogicalProject(t1a=[$0], t1b=[$1], t1c=[$2], t1d=[$3], t1e=[$4], t1f=[$5], t1g=[$6], t1h=[$7], t1i=[$8], EXPR$0=[$10])
+ : +- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+ : :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+ : +- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)])
+ : +- LogicalProject(t3a=[$0], t3d=[$3])
+ : +- LogicalFilter(condition=[IS NOT NULL($0)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+ +- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
+ +- LogicalProject(t2a=[$0], t2h=[$7])
+ +- LogicalFilter(condition=[IS NOT NULL($0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
index 8094d28..66a6422 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
@@ -33,7 +33,7 @@ LogicalIntersect(all=[false])
<![CDATA[
GroupAggregate(groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
- +- Join(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[c])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -128,7 +128,7 @@ LogicalMinus(all=[false])
<![CDATA[
GroupAggregate(groupBy=[c], select=[c])
+- Exchange(distribution=[hash[c]])
- +- Join(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ +- Join(joinType=[LeftAntiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[c])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
index bfe8327..11df2ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
@@ -668,11 +668,11 @@ LogicalIntersect(all=[false])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
+Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[random]])
: +- GroupAggregate(groupBy=[random], select=[random])
: +- Exchange(distribution=[hash[random]])
-: +- Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+: +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: :- Exchange(distribution=[hash[random]], reuse_id=[1])
: : +- Calc(select=[random])
: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
index a734771..8e56ae1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
@@ -29,9 +29,9 @@ WHERE cast(D.name as bigint) > 1000
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
+- LogicalFilter(condition=[>(CAST($6):BIGINT, 1000)])
- +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10))])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -66,11 +66,11 @@ GROUP BY b
LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
+- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[>($7, 10)])
- +- LogicalFilter(condition=[=($1, $5)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
- :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proc=[PROCTIME()])
- : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+ :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proc=[PROCTIME()])
+ : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proc])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -95,9 +95,9 @@ GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalFilter(condition=[=($0, $5)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -123,9 +123,9 @@ WHERE T.c > 1000
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
+- LogicalFilter(condition=[>($2, 1000)])
- +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10))])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -152,9 +152,9 @@ WHERE T.c > 1000
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
+- LogicalFilter(condition=[>($2, 1000)])
- +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10), =($6, _UTF-16LE'AAA'))])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10), =($1, _UTF-16LE'AAA'))])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -175,11 +175,11 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, C
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalFilter(condition=[=($0, $3)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
- :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
- : +- LogicalFilter(condition=[>($2, 1000)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
+ :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+ : +- LogicalFilter(condition=[>($2, 1000)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -200,9 +200,9 @@ Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalFilter(condition=[=($0, $5)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
@@ -227,9 +227,9 @@ ON T.a = D.id
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
-+- LogicalFilter(condition=[=($0, $5)])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index b6c2b48..bb43ecc 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -55,14 +55,14 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
@Test
def testTimestampAddWithWrongTimestampInterval(): Unit = {
thrown.expect(classOf[SqlParserException])
- testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24'))", "2016-06-16")
+ testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24')", "2016-06-16")
}
@Test
def testTimestampAddWithWrongTimestampFormat(): Unit = {
thrown.expect(classOf[SqlParserException])
- thrown.expectMessage("Illegal TIMESTAMP literal '2016-02-24'")
- testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016-02-24'))", "2016-06-16")
+ thrown.expectMessage("Illegal TIMESTAMP literal '2016/02/24'")
+ testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016/02/24')", "2016-06-16")
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
index 9ba36e9..7169f7a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
@@ -58,8 +58,7 @@ class SubplanReuseTest extends TableTestBase {
util.verifyPlanNotExpected(sqlQuery, "Reused")
}
- @Test(expected = classOf[AssertionError])
- // TODO after CALCITE-3020 fixed, remove expected exception
+ @Test
def testSubplanReuseWithDifferentRowType(): Unit = {
util.tableEnv.getConfig.getConf.setBoolean(
OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED, false)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 111cf9f..8d7c348 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -74,7 +74,7 @@ class FlinkRelMdHandlerTestBase {
val tableConfig = new TableConfig()
val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
- // TODO batch RelNode and stream RelNode should have different PlanningConfigurationBuilder
+ // TODO batch RelNode and stream RelNode should have different PlannerContext
// and RelOptCluster due to they have different trait definitions.
val plannerContext: PlannerContext =
new PlannerContext(
@@ -728,7 +728,6 @@ class FlinkRelMdHandlerTestBase {
cluster,
flinkLogicalTraits,
studentFlinkLogicalScan,
- logicalAgg.indicator,
logicalAgg.getGroupSet,
logicalAgg.getGroupSets,
logicalAgg.getAggCallList
@@ -878,7 +877,6 @@ class FlinkRelMdHandlerTestBase {
cluster,
flinkLogicalTraits,
studentFlinkLogicalScan,
- logicalAggWithAuxGroup.indicator,
logicalAggWithAuxGroup.getGroupSet,
logicalAggWithAuxGroup.getGroupSets,
logicalAggWithAuxGroup.getAggCallList
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
index dd5c042..71dc9a4e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
@@ -213,8 +213,8 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
def testNotInWithUncorrelatedOnWhere_Case7(): Unit = {
util.addTableSource[(Int)]("t1", 'i)
- thrown.expect(classOf[TableException])
- thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+ // TODO some bugs in SubQueryRemoveRule
+ thrown.expect(classOf[RuntimeException])
// TODO Calcite does not support project with correlated expressions.
val sqlQuery = "SELECT b FROM l WHERE " +
@@ -725,8 +725,10 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
def testNotInNotExists3(): Unit = {
util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
- thrown.expect(classOf[TableException])
- thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+ // TODO some bugs in SubQueryRemoveRule
+ // the result RelNode (LogicalJoin(condition=[=($1, $11)], joinType=[left]))
+ // after SubQueryRemoveRule is unexpected
+ thrown.expect(classOf[RuntimeException])
// TODO Calcite does not support project with correlated expressions.
val sqlQuery = "SELECT c FROM l WHERE (" +
@@ -752,8 +754,8 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
def testInNotInExistsNotExists2(): Unit = {
util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
- thrown.expect(classOf[TableException])
- thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+ // TODO some bugs in SubQueryRemoveRule
+ thrown.expect(classOf[RuntimeException])
// TODO Calcite does not support project with correlated expressions.
val sqlQuery = "SELECT c FROM l WHERE (" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
index c1225f6..944eb02 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
@@ -208,9 +208,8 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
util.addTableSource[(Int)]("t1", 'i)
util.addTableSource[(Int)]("t2", 'j)
- thrown.expect(classOf[TableException])
- // correlate variable id is unstable, ignore here
- thrown.expectMessage("unexpected correlate variable $cor")
+ // TODO some bugs in SubQueryRemoveRule
+ thrown.expect(classOf[RuntimeException])
// TODO Calcite does not support project with correlated expressions.
val sqlQuery = "SELECT b FROM l WHERE" +
@@ -1659,9 +1658,10 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
def testInExists3(): Unit = {
util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
- thrown.expect(classOf[TableException])
- // correlate variable id is unstable, ignore here
- thrown.expectMessage("unexpected correlate variable $cor")
+ // TODO some bugs in SubQueryRemoveRule
+ // the result RelNode (LogicalJoin(condition=[=($1, $8)], joinType=[left]))
+ // after SubQueryRemoveRule is unexpected
+ thrown.expect(classOf[RuntimeException])
// TODO Calcite does not support project with correlated expressions.
val sqlQuery = "SELECT c FROM l WHERE (" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
index 70e09aa..f3b07cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
@@ -35,7 +35,7 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
util.addTableSource[(String, Short, Int, Long, Float, Double, BigDecimal, Timestamp, Date)](
"t3", 't3a, 't3b, 't3c, 't3d, 't3e, 't3f, 't3g, 't3h, 't3i)
- @Test(expected = classOf[RuntimeException])
+ @Test
def testWithProjectProjectCorrelate(): Unit = {
val sqlQuery =
"""
@@ -46,7 +46,7 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
util.verifyPlan(sqlQuery)
}
- @Test(expected = classOf[RuntimeException])
+ @Test
def testWithProjectFilterCorrelate(): Unit = {
val sqlQuery =
"""
@@ -111,7 +111,8 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
util.verifyPlan(sqlQuery)
}
- @Test(expected = classOf[RuntimeException])
+ @Test(expected = classOf[AssertionError])
+ // TODO some bugs in RelDecorrelator.AdjustProjectForCountAggregateRule
def testWithProjectCaseWhenCorrelate(): Unit = {
val sqlQuery =
"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
index 2fa8d39..8a455b7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
@@ -57,8 +57,7 @@ class SubplanReuseTest extends TableTestBase {
util.verifyPlanNotExpected(sqlQuery, "Reused")
}
- @Test(expected = classOf[AssertionError])
- // TODO after CALCITE-3020 fixed, remove expected exception
+ @Test
def testSubplanReuseWithDifferentRowType(): Unit = {
util.tableEnv.getConfig.getConf.setBoolean(
OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED, false)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index c3a9e2d..1294ff4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -77,8 +77,8 @@ class LookupJoinTest extends TableTestBase with Serializable {
expectExceptionThrown(
"SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id",
- "Unsupported join type for semi-join RIGHT",
- classOf[IllegalArgumentException]
+ "Correlate has invalid join type RIGHT",
+ classOf[AssertionError]
)
// only support join on raw key of right table
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
index cd55c6a..00a8b76 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
@@ -46,7 +46,7 @@ class PruneAggregateCallITCase extends BatchTestBase {
checkResult(
"""
|SELECT c, a FROM
- | (SELECT a, c, COUNT(b) as c, SUM(b) as s FROM MyTable GROUP BY a, c) t
+ | (SELECT a, c, COUNT(b) as cnt, SUM(b) as s FROM MyTable GROUP BY a, c) t
|WHERE s > 1
""".stripMargin,
Seq(row("Hello world", 3), row("Hello", 2))
diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml
index aeea8c1..f407bac 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -72,7 +72,7 @@ under the License.
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<!-- When updating the Calcite version, make sure to update the version and dependency exclusions -->
- <version>1.13.0</version>
+ <version>1.15.0</version>
<exclusions>
<!--
@@ -81,9 +81,9 @@ under the License.
We exclude all the dependencies of Avatica because currently we only use
TimeUnit, TimeUnitRange and SqlDateTimeUtils which only dependent JDK.
- "mvn dependency:tree" as of Avatica 1.13:
+ "mvn dependency:tree" as of Avatica 1.15:
- [INFO] +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
+ [INFO] +- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
-->
<exclusion>