You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/02 09:10:28 UTC
[flink] branch master updated: [FLINK-12937][table-planner-blink]
Introduce join reorder planner rules in blink planner
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a6d72fe [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner
a6d72fe is described below
commit a6d72fed708e99404c96ffeca6ad70e08963e9cd
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jun 22 12:50:06 2019 +0800
[FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner
This closes #8832
---
.../flink/table/api/PlannerConfigOptions.java | 6 +
.../plan/optimize/program/FlinkBatchProgram.scala | 22 +-
.../plan/optimize/program/FlinkStreamProgram.scala | 23 +-
.../table/plan/rules/FlinkBatchRuleSets.scala | 16 +
.../table/plan/rules/FlinkStreamRuleSets.scala | 16 +
.../logical/RewriteMultiJoinConditionRule.scala | 129 +++++
.../table/plan/batch/sql/join/JoinReorderTest.xml | 600 +++++++++++++++++++
.../logical/RewriteMultiJoinConditionRuleTest.xml | 318 +++++++++++
.../table/plan/stream/sql/join/JoinReorderTest.xml | 635 +++++++++++++++++++++
.../plan/batch/sql/join/JoinReorderTest.scala | 25 +
.../table/plan/common/JoinReorderTestBase.scala | 233 ++++++++
.../FlinkAggregateInnerJoinTransposeRuleTest.scala | 2 +-
.../logical/FlinkJoinPushExpressionsRuleTest.scala | 10 +-
.../RewriteMultiJoinConditionRuleTest.scala | 153 +++++
.../plan/stream/sql/join/JoinReorderTest.scala | 25 +
15 files changed, 2208 insertions(+), 5 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
index 7b05df1..5e7a1a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
@@ -137,4 +137,10 @@ public class PlannerConfigOptions {
.defaultValue(true)
.withDescription("Allow trying to push predicate down to a FilterableTableSource. " +
"the default value is true, means allow the attempt.");
+
+ public static final ConfigOption<Boolean> SQL_OPTIMIZER_JOIN_REORDER_ENABLED =
+ key("sql.optimizer.join-reorder.enabled")
+ .defaultValue(false)
+ .withDescription("Enables join reorder in optimizer cbo. Default is disabled.");
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 090b4fb..e73fde6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.plan.optimize.program
import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.PlannerConfigOptions
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
@@ -33,6 +34,7 @@ object FlinkBatchProgram {
val DECORRELATE = "decorrelate"
val DEFAULT_REWRITE = "default_rewrite"
val PREDICATE_PUSHDOWN = "predicate_pushdown"
+ val JOIN_REORDER = "join_reorder"
val JOIN_REWRITE = "join_rewrite"
val WINDOW = "window"
val LOGICAL = "logical"
@@ -120,7 +122,7 @@ object FlinkBatchProgram {
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
.build(), "other predicate rewrite")
- .setIterations(5).build())
+ .setIterations(5).build(), "predicate rewrite")
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
@@ -135,6 +137,24 @@ object FlinkBatchProgram {
.build(), "prune empty after predicate push down")
.build())
+ // join reorder
+ if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) {
+ chainedProgram.addLast(
+ JOIN_REORDER,
+ FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+ .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkBatchRuleSets.JOIN_REORDER_PERPARE_RULES)
+ .build(), "merge join into MultiJoin")
+ .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkBatchRuleSets.JOIN_REORDER_RULES)
+ .build(), "do join reorder")
+ .build())
+ }
+
// join rewrite
chainedProgram.addLast(
JOIN_REWRITE,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index a5f1dac..b64ff31 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -18,11 +18,13 @@
package org.apache.flink.table.plan.optimize.program
-import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.PlannerConfigOptions
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.rules.{FlinkBatchRuleSets, FlinkStreamRuleSets}
+import org.apache.calcite.plan.hep.HepMatchOrder
+
/**
* Defines a sequence of programs to optimize for stream table plan.
*/
@@ -34,6 +36,7 @@ object FlinkStreamProgram {
val TIME_INDICATOR = "time_indicator"
val DEFAULT_REWRITE = "default_rewrite"
val PREDICATE_PUSHDOWN = "predicate_pushdown"
+ val JOIN_REORDER = "join_reorder"
val LOGICAL = "logical"
val LOGICAL_REWRITE = "logical_rewrite"
val PHYSICAL = "physical"
@@ -130,6 +133,24 @@ object FlinkStreamProgram {
.build(), "prune empty after predicate push down")
.build())
+ // join reorder
+ if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) {
+ chainedProgram.addLast(
+ JOIN_REORDER,
+ FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
+ .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkStreamRuleSets.JOIN_REORDER_PERPARE_RULES)
+ .build(), "merge join into MultiJoin")
+ .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkStreamRuleSets.JOIN_REORDER_RULES)
+ .build(), "do join reorder")
+ .build())
+ }
+
// optimize the logical plan
chainedProgram.addLast(
LOGICAL,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 75f1d5a..71fcb23 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -218,6 +218,22 @@ object FlinkBatchRuleSets {
FILTER_RULES.asScala
).asJava)
+ val JOIN_REORDER_PERPARE_RULES: RuleSet = RuleSets.ofList(
+ // merge join to MultiJoin
+ JoinToMultiJoinRule.INSTANCE,
+ // merge project to MultiJoin
+ ProjectMultiJoinMergeRule.INSTANCE,
+ // merge filter to MultiJoin
+ FilterMultiJoinMergeRule.INSTANCE
+ )
+
+ val JOIN_REORDER_RULES: RuleSet = RuleSets.ofList(
+ // equi-join predicates transfer
+ RewriteMultiJoinConditionRule.INSTANCE,
+ // join reorder
+ LoptOptimizeJoinRule.INSTANCE
+ )
+
/**
* RuleSet to do logical optimize.
* This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 0f8b219..ca94b8e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -201,6 +201,22 @@ object FlinkStreamRuleSets {
ProjectSetOpTransposeRule.INSTANCE
)
+ val JOIN_REORDER_PERPARE_RULES: RuleSet = RuleSets.ofList(
+ // merge project to MultiJoin
+ ProjectMultiJoinMergeRule.INSTANCE,
+ // merge filter to MultiJoin
+ FilterMultiJoinMergeRule.INSTANCE,
+ // merge join to MultiJoin
+ JoinToMultiJoinRule.INSTANCE
+ )
+
+ val JOIN_REORDER_RULES: RuleSet = RuleSets.ofList(
+ // equi-join predicates transfer
+ RewriteMultiJoinConditionRule.INSTANCE,
+ // join reorder
+ LoptOptimizeJoinRule.INSTANCE
+ )
+
/**
* RuleSet to do logical optimize.
* This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala
new file mode 100644
index 0000000..392e58b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.rules.MultiJoin
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Planner rule to apply transitive closure on [[MultiJoin]] for equi-join predicates.
+ *
+ * <p>e.g.
+ * MJ(A, B, C) ON A.a1=B.b1 AND B.b1=C.c1 →
+ * MJ(A, B, C) ON A.a1=B.b1 AND B.b1=C.c1 AND A.a1=C.c1
+ *
+ * The advantage of applying this rule is that it increases the choice of join reorder;
+ * at the same time, the disadvantage is that it will use more CPU for additional join predicates.
+ */
+class RewriteMultiJoinConditionRule extends RelOptRule(
+ operand(classOf[MultiJoin], any),
+ "RewriteMultiJoinConditionRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val multiJoin: MultiJoin = call.rel(0)
+ // currently only supports all join types are INNER join
+ val isAllInnerJoin = multiJoin.getJoinTypes.forall(_ eq JoinRelType.INNER)
+ val (equiJoinFilters, _) = partitionJoinFilters(multiJoin)
+ !multiJoin.isFullOuterJoin && isAllInnerJoin && equiJoinFilters.size > 1
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val multiJoin: MultiJoin = call.rel(0)
+ val (equiJoinFilters, nonEquiJoinFilters) = partitionJoinFilters(multiJoin)
+ // there is no `equals` method in RexCall, so the key of this map should be String
+ val equiJoinFilterMap = mutable.HashMap[String, mutable.ListBuffer[RexNode]]()
+ equiJoinFilters.foreach {
+ case c: RexCall =>
+ require(c.isA(SqlKind.EQUALS))
+ val left = c.operands.head
+ val right = c.operands(1)
+ equiJoinFilterMap.getOrElseUpdate(left.toString, mutable.ListBuffer[RexNode]()) += right
+ equiJoinFilterMap.getOrElseUpdate(right.toString, mutable.ListBuffer[RexNode]()) += left
+ }
+
+ val candidateJoinFilters = equiJoinFilterMap.values.filter(_.size > 1)
+ if (candidateJoinFilters.isEmpty) {
+ // no transitive closure predicates
+ return
+ }
+
+ val newEquiJoinFilters = mutable.ListBuffer[RexNode](equiJoinFilters: _*)
+ def containEquiJoinFilter(joinFilter: RexNode): Boolean = {
+ newEquiJoinFilters.exists { f => f.toString.equals(joinFilter.toString) }
+ }
+
+ val rexBuilder = multiJoin.getCluster.getRexBuilder
+ candidateJoinFilters.foreach {
+ candidate => candidate.indices.foreach {
+ startIndex =>
+ val op1 = candidate(startIndex)
+ candidate.subList(startIndex + 1, candidate.size).foreach {
+ op2 =>
+ // `a = b` and `b = a` are the same
+ val newFilter1 = rexBuilder.makeCall(EQUALS, op1, op2)
+ val newFilter2 = rexBuilder.makeCall(EQUALS, op2, op1)
+ if (!containEquiJoinFilter(newFilter1) && !containEquiJoinFilter(newFilter2)) {
+ newEquiJoinFilters += newFilter1
+ }
+ }
+ }
+ }
+
+ if (newEquiJoinFilters.size == equiJoinFilters.size) {
+ // no new join filters added
+ return
+ }
+
+ val newJoinFilter = call.builder().and(newEquiJoinFilters.toList ::: nonEquiJoinFilters.toList)
+ val newMultiJoin =
+ new MultiJoin(
+ multiJoin.getCluster,
+ multiJoin.getInputs,
+ newJoinFilter,
+ multiJoin.getRowType,
+ multiJoin.isFullOuterJoin,
+ multiJoin.getOuterJoinConditions,
+ multiJoin.getJoinTypes,
+ multiJoin.getProjFields,
+ multiJoin.getJoinFieldRefCountsMap,
+ multiJoin.getPostJoinFilter)
+
+ call.transformTo(newMultiJoin)
+ }
+
+ /**
+ * Partitions MultiJoin condition in equi join filters and non-equi join filters.
+ */
+ private def partitionJoinFilters(multiJoin: MultiJoin): (Seq[RexNode], Seq[RexNode]) = {
+ val joinFilters = RelOptUtil.conjunctions(multiJoin.getJoinFilter)
+ joinFilters.partition(f => f.isA(SqlKind.EQUALS))
+ }
+
+}
+
+object RewriteMultiJoinConditionRule {
+ val INSTANCE = new RewriteMultiJoinConditionRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml
new file mode 100644
index 0000000..8fe210d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml
@@ -0,0 +1,600 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testAllFullOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ FULL OUTER JOIN T2 ON a1 = a2
+ FULL OUTER JOIN T3 ON a1 = a3
+ FULL OUTER JOIN T4 ON a1 = a4
+ FULL OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[full])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[full])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[full])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashJoin(joinType=[FullOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right])
+:- Exchange(distribution=[hash[a4]])
+: +- HashJoin(joinType=[FullOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], build=[right])
+: :- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+: : :- Exchange(distribution=[hash[a1]])
+: : : +- HashJoin(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])
+: : : :- Exchange(distribution=[hash[a1]])
+: : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+: : : +- Exchange(distribution=[hash[a2]])
+: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+: : +- Exchange(distribution=[single])
+: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+: +- Exchange(distribution=[hash[a4]])
+: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAllLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ LEFT OUTER JOIN T2 ON a1 = a2
+ LEFT OUTER JOIN T3 ON a2 = a3
+ LEFT OUTER JOIN T4 ON a1 = a4
+ LEFT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[left])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right])
+:- Exchange(distribution=[hash[a4]])
+: +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+: :- HashJoin(joinType=[LeftOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+: : :- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])
+: : : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+: : : +- Exchange(distribution=[broadcast])
+: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+: : +- Exchange(distribution=[broadcast])
+: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+: +- Exchange(distribution=[broadcast])
+: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAllRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ RIGHT OUTER JOIN T2 ON a1 = a2
+ RIGHT OUTER JOIN T3 ON a2 = a3
+ RIGHT OUTER JOIN T4 ON a1 = a4
+ RIGHT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[right])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[left])
+ :- Exchange(distribution=[broadcast])
+ : +- SortMergeJoin(joinType=[RightOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3])
+ : :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testBushyJoinCondition1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($3, $6), =($0, $9), =($6, $12))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[broadcast])
+ : +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testBushyJoinCondition2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($4, $7), =($1, $10), =($7, $13))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
+ :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right])
+ : :- Exchange(distribution=[hash[b1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[b4]])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])
+ : :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)])
+ : : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
+ : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : : +- Exchange(distribution=[broadcast])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerAndFullOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ FULL OUTER JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[full])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+ : :- Exchange(distribution=[single])
+ : : +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])
+ : : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[broadcast])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[single])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)])
+ +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerAndLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ LEFT OUTER JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[broadcast])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerAndRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ RIGHT OUTER JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], build=[right])
+ :- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+ : :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinWithFilter">
+ <Resource name="sql">
+ <![CDATA[
+WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10),
+ V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000),
+ V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100)
+
+SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $12), <($13, 15))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11])
+ : +- LogicalFilter(condition=[>(+($7, $1), 100)])
+ : +- LogicalJoin(condition=[=($9, $0)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ : +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8])
+ : +- LogicalFilter(condition=[<(*($1, $7), 2000)])
+ : +- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5])
+ : : +- LogicalFilter(condition=[>(*($1, $4), 10)])
+ : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[AND(>(+(b2, b4), 100), =(a2, a4))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- NestedLoopJoin(joinType=[InnerJoin], where=[AND(>(*(b1, b2), 10), =(a2, a3))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+ : :- Calc(select=[a5, b5, c5], where=[<(b5, 15)])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[broadcast])
+ : +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)])
+ : +- HashJoin(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinWithProject">
+ <Resource name="sql">
+ <![CDATA[
+WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2),
+ V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3),
+ V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4)
+
+SELECT * FROM V3, T5 where a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a3=[$0], b1=[$1], a1=[$2], c2=[$3], c3=[$4], a4=[$5], b4=[$6], a5=[$7], b5=[$8], c5=[$9])
++- LogicalFilter(condition=[=($5, $7)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalProject(a3=[$3], b1=[$4], a1=[$5], c2=[$6], c3=[$7], a4=[$0], b4=[$1])
+ : +- LogicalJoin(condition=[=($5, $0)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ : +- LogicalProject(a3=[$4], b1=[$0], a1=[$1], c2=[$3], c3=[$6])
+ : +- LogicalJoin(condition=[=($2, $4)], joinType=[inner])
+ : :- LogicalProject(b1=[$1], a1=[$0], a2=[$3], c2=[$5])
+ : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a3, b1, a1, c2, c3, a4, b4], isBroadcast=[true], build=[right])
+ :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a3, b1, a1, c2, c3, a4, b4], isBroadcast=[true], build=[right])
+ :- Calc(select=[a3, b1, a1, c2, c3])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[b1, a1, a2, c2, a3, c3], isBroadcast=[true], build=[right])
+ : :- Calc(select=[b1, a1, a2, c2])
+ : : +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, c2], isBroadcast=[true], build=[right])
+ : : :- Calc(select=[a1, b1])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[broadcast])
+ : : +- Calc(select=[a2, c2])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- Calc(select=[a3, c3])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[a4, b4])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testStarJoinCondition1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($0, $6), =($0, $9), =($0, $12))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[broadcast])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testStarJoinCondition2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($1, $7), =($1, $10), =($1, $13))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
+ :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right])
+ : :- Exchange(distribution=[hash[b1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[b4]])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])
+ : :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)])
+ : : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
+ : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : : +- Exchange(distribution=[broadcast])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWithoutColumnStats">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($2, $5), =($2, $8), =($5, $11), =($2, $14))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(c1, c2)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ +- Exchange(distribution=[broadcast])
+ +- HashJoin(joinType=[InnerJoin], where=[=(c2, c5)], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[broadcast])
+ +- HashJoin(joinType=[InnerJoin], where=[=(c3, c4)], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- Calc(select=[a5, b5, c5, a3, b3, c3], where=[=(c5, c3)])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], isBroadcast=[true], build=[right])
+ : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[broadcast])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[broadcast])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml
new file mode 100644
index 0000000..29dd20a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml
@@ -0,0 +1,318 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testMultiJoin_FullJoin1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A FULL OUTER JOIN B ON a1 = b1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+ :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_FullJoin2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A FULL OUTER JOIN B ON a1 = b1 FULL OUTER JOIN C ON a1 = c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[full])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_InnerJoin1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A, B WHERE a1 = b1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalFilter(condition=[=($0, $2)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_InnerJoin2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A, B, C WHERE a1 = b1 AND a1 = c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($0, $4))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[AND(=($0, $4), =($0, $2), =($4, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_InnerJoin3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A, B, C, D WHERE a1 = b1 AND b1 = c1 AND c1 = d1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5], d1=[$6], d2=[$7])
++- LogicalFilter(condition=[AND(=($0, $2), =($2, $4), =($4, $6))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, D, source: [TestTableSource(d1, d2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[AND(=($4, $6), =($2, $4), =($0, $2), =($6, $2), =($4, $0), =($6, $0))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, D, source: [TestTableSource(d1, d2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_InnerJoin4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A, B, C WHERE a1 = b1 AND a1 > c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), >($0, $4))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[AND(>($0, $4), =($0, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_InnerJoin5">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A, B, C WHERE a1 + 1 = b1 AND a1 + 1 = c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalFilter(condition=[AND(=(+($0, 1), $2), =(+($0, 1), $4))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[AND(=(+($0, 1), $4), =(+($0, 1), $2), =($4, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_LeftJoin1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A LEFT JOIN B ON a1 = b1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_LeftJoin2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A JOIN B ON a1 = b1 LEFT JOIN C ON b1 = c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($2, $4)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($2, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_LeftJoin3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A LEFT JOIN B ON a1 = b1 JOIN C ON a1 = c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_RightJoin1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A RIGHT JOIN B ON a1 = b1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_RightJoin2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A JOIN B ON a1 = b1 RIGHT JOIN C ON b1 = c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($2, $4)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($2, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoin_RightJoin3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM A RIGHT JOIN B ON a1 = b1 JOIN C ON a1 = c1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml
new file mode 100644
index 0000000..b05c7a0
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml
@@ -0,0 +1,635 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testAllFullOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ FULL OUTER JOIN T2 ON a1 = a2
+ FULL OUTER JOIN T3 ON a1 = a3
+ FULL OUTER JOIN T4 ON a1 = a4
+ FULL OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[full])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[full])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[full])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Join(joinType=[FullOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[hash[a4]])
+: +- Join(joinType=[FullOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+: :- Exchange(distribution=[hash[a1]])
+: : +- Join(joinType=[FullOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+: : :- Exchange(distribution=[hash[a1]])
+: : : +- Join(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+: : : :- Exchange(distribution=[hash[a1]])
+: : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+: : : +- Exchange(distribution=[hash[a2]])
+: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+: : +- Exchange(distribution=[hash[a3]])
+: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+: +- Exchange(distribution=[hash[a4]])
+: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAllLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ LEFT OUTER JOIN T2 ON a1 = a2
+ LEFT OUTER JOIN T3 ON a2 = a3
+ LEFT OUTER JOIN T4 ON a1 = a4
+ LEFT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[left])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Join(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[hash[a4]])
+: +- Join(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+: :- Exchange(distribution=[hash[a1]])
+: : +- Join(joinType=[LeftOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+: : :- Exchange(distribution=[hash[a2]])
+: : : +- Join(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+: : : :- Exchange(distribution=[hash[a1]])
+: : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+: : : +- Exchange(distribution=[hash[a2]])
+: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+: : +- Exchange(distribution=[hash[a3]])
+: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+: +- Exchange(distribution=[hash[a4]])
+: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAllRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ RIGHT OUTER JOIN T2 ON a1 = a2
+ RIGHT OUTER JOIN T3 ON a2 = a3
+ RIGHT OUTER JOIN T4 ON a1 = a4
+ RIGHT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[right])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- Join(joinType=[RightOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Join(joinType=[RightOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2]])
+ : : +- Join(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testBushyJoinCondition1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($3, $6), =($0, $9), =($6, $12))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a4, a2), =(a3, a4), =(a5, a4))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2, a3, a5]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a2, a3), =(a1, a2), =(a5, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2, a2, a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3, a1, a5]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a3, a5), =(a1, a5))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a5, a5]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[hash[a3, a1]])
+ : +- Join(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4, a4, a4, a4]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testBushyJoinCondition2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($4, $7), =($1, $10), =($7, $13))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(b3, b5), =(b2, b3), =(b3, b1), =(b4, b3))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[b5, b2, b1, b4]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(b1, b4), =(b1, b2), =(b5, b1))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[b1, b1, b1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[b4, b2, b5]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(b4, b2), =(b5, b4))], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[b2, b5]])
+ : : +- Join(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[b5]])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : : +- Exchange(distribution=[hash[b2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[b4, b4]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[hash[b3, b3, b3, b3]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerAndFullOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ FULL OUTER JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[full])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a1))], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a1]])
+ : +- Join(joinType=[FullOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2]])
+ : : +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4, a5]])
+ +- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerAndLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ LEFT OUTER JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- Join(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[a2]])
+ : +- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerAndRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ RIGHT OUTER JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a2]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a1))], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1, a1]])
+ : : +- Join(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a4, a5]])
+ : +- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a5]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[hash[a4]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinWithFilter">
+ <Resource name="sql">
+ <![CDATA[
+WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10),
+ V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000),
+ V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100)
+
+SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $12), <($13, 15))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11])
+ : +- LogicalFilter(condition=[>(+($7, $1), 100)])
+ : +- LogicalJoin(condition=[=($9, $0)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ : +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8])
+ : +- LogicalFilter(condition=[<(*($1, $7), 2000)])
+ : +- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5])
+ : : +- LogicalFilter(condition=[>(*($1, $4), 10)])
+ : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a4, a5), =(a3, a4), =(a4, a2), =(a1, a4), >(+(b2, b4), 100))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5, a3, a2, a1]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a2, a3), =(a1, a2), =(a2, a5), >(*(b1, b2), 10))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2, a2, a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3, a1, a5]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a5, a3), =(a1, a5))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a5, a5]])
+ : : +- Calc(select=[a5, b5, c5], where=[<(b5, 15)])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[hash[a3, a1]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4, a4, a4, a4]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinWithProject">
+ <Resource name="sql">
+ <![CDATA[
+WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2),
+ V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3),
+ V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4)
+
+SELECT * FROM V3, T5 where a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a3=[$0], b1=[$1], a1=[$2], c2=[$3], c3=[$4], a4=[$5], b4=[$6], a5=[$7], b5=[$8], c5=[$9])
++- LogicalFilter(condition=[=($5, $7)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalProject(a3=[$3], b1=[$4], a1=[$5], c2=[$6], c3=[$7], a4=[$0], b4=[$1])
+ : +- LogicalJoin(condition=[=($5, $0)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ : +- LogicalProject(a3=[$4], b1=[$0], a1=[$1], c2=[$3], c3=[$6])
+ : +- LogicalJoin(condition=[=($2, $4)], joinType=[inner])
+ : :- LogicalProject(b1=[$1], a1=[$0], a2=[$3], c2=[$5])
+ : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a3, b1, a1, c2, c3, a4, b4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- Join(joinType=[InnerJoin], where=[=(a1, a4)], select=[a3, b1, a1, c2, c3, a4, b4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Calc(select=[a3, b1, a1, c2, c3])
+ : +- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[b1, a1, a2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2]])
+ : : +- Calc(select=[b1, a1, a2, c2])
+ : : +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- Calc(select=[a1, b1])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a2]])
+ : : +- Calc(select=[a2, c2])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4]])
+ +- Calc(select=[a4, b4])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testStarJoinCondition1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($0, $6), =($0, $9), =($0, $12))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a4), =(a4, a3), =(a4, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a5, a3, a2]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a1, a2), =(a5, a2), =(a3, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2, a2, a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a1, a5, a3]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(a1, a5), =(a5, a3))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a5, a5]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[hash[a1, a3]])
+ : +- Join(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4, a4, a4, a4]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testStarJoinCondition2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($1, $7), =($1, $10), =($1, $13))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(b1, b3), =(b5, b3), =(b4, b3), =(b3, b2))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[b1, b5, b4, b2]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(b1, b5), =(b1, b4), =(b1, b2))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[b1, b1, b1]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[b5, b4, b2]])
+ : +- Join(joinType=[InnerJoin], where=[AND(=(b5, b4), =(b4, b2))], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[b5, b2]])
+ : : +- Join(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[b5]])
+ : : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : : +- Exchange(distribution=[hash[b2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[b4, b4]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[hash[b3, b3, b3, b3]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWithoutColumnStats">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($2, $5), =($2, $8), =($5, $11), =($2, $14))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(c1, c5), =(c1, c3), =(c1, c2), =(c4, c1))], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[c1, c1, c1, c1]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ +- Exchange(distribution=[hash[c5, c3, c2, c4]])
+ +- Join(joinType=[InnerJoin], where=[AND(=(c2, c4), =(c5, c2), =(c3, c2))], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[c2, c2, c2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[hash[c4, c5, c3]])
+ +- Join(joinType=[InnerJoin], where=[AND(=(c4, c5), =(c4, c3))], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[c5, c3]])
+ : +- Join(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[c5]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ : +- Exchange(distribution=[hash[c3]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[c4, c4]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala
new file mode 100644
index 0000000..cb31143
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.batch.sql.join
+
+import org.apache.flink.table.plan.common.JoinReorderTestBase
+import org.apache.flink.table.util.TableTestUtil
+
+class JoinReorderTest extends JoinReorderTestBase {
+ override protected def getTableTestUtil: TableTestUtil = batchTestUtil()
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala
new file mode 100644
index 0000000..88883a7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.common
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{PlannerConfigOptions, Types}
+import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats}
+import org.apache.flink.table.util.{TableTestBase, TableTestUtil}
+
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConversions._
+
+abstract class JoinReorderTestBase extends TableTestBase {
+
+ protected val util: TableTestUtil = getTableTestUtil
+
+ protected def getTableTestUtil: TableTestUtil
+
+ @Before
+ def setup(): Unit = {
+ val types = Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)
+
+ util.addTableSource("T1", types, Array("a1", "b1", "c1"), FlinkStatistic.builder()
+ .tableStats(new TableStats(1000000L, Map(
+ "a1" -> new ColumnStats(1000000L, 0L, 4.0, 4, null, null),
+ "b1" -> new ColumnStats(10L, 0L, 8.0, 8, null, null)
+ ))).build())
+
+ util.addTableSource("T2", types, Array("a2", "b2", "c2"), FlinkStatistic.builder()
+ .tableStats(new TableStats(10000L, Map(
+ "a2" -> new ColumnStats(100L, 0L, 4.0, 4, null, null),
+ "b2" -> new ColumnStats(5000L, 0L, 8.0, 8, null, null)
+ ))).build())
+
+ util.addTableSource("T3", types, Array("a3", "b3", "c3"), FlinkStatistic.builder()
+ .tableStats(new TableStats(10L, Map(
+ "a3" -> new ColumnStats(5L, 0L, 4.0, 4, null, null),
+ "b3" -> new ColumnStats(2L, 0L, 8.0, 8, null, null)
+ ))).build())
+
+ util.addTableSource("T4", types, Array("a4", "b4", "c4"), FlinkStatistic.builder()
+ .tableStats(new TableStats(100L, Map(
+ "a4" -> new ColumnStats(100L, 0L, 4.0, 4, null, null),
+ "b4" -> new ColumnStats(20L, 0L, 8.0, 8, null, null)
+ ))).build())
+
+ util.addTableSource("T5", types, Array("a5", "b5", "c5"), FlinkStatistic.builder()
+ .tableStats(new TableStats(500000L, Map(
+ "a5" -> new ColumnStats(200000L, 0L, 4.0, 4, null, null),
+ "b5" -> new ColumnStats(200L, 0L, 8.0, 8, null, null)
+ ))).build())
+
+ util.getTableEnv.getConfig.getConf.setBoolean(
+ PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED, true)
+ }
+
+ @Test
+ def testStarJoinCondition1(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1, T2, T3, T4, T5
+ |WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testStarJoinCondition2(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1, T2, T3, T4, T5
+ |WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testBushyJoinCondition1(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1, T2, T3, T4, T5
+ |WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testBushyJoinCondition2(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1, T2, T3, T4, T5
+ |WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testWithoutColumnStats(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1, T2, T3, T4, T5
+ |WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testJoinWithProject(): Unit = {
+ val sql =
+ s"""
+ |WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2),
+ | V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3),
+ | V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4)
+ |
+ |SELECT * FROM V3, T5 where a4 = a5
+ """.stripMargin
+ // can not reorder now
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testJoinWithFilter(): Unit = {
+ val sql =
+ s"""
+ |WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10),
+ | V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000),
+ | V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100)
+ |
+ |SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testInnerAndLeftOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | JOIN T2 ON a1 = a2
+ | JOIN T3 ON a2 = a3
+ | LEFT OUTER JOIN T4 ON a1 = a4
+ | JOIN T5 ON a4 = a5
+ """.stripMargin
+ // T1, T2, T3 can reorder
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testInnerAndRightOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | RIGHT OUTER JOIN T2 ON a1 = a2
+ | JOIN T3 ON a2 = a3
+ | JOIN T4 ON a1 = a4
+ | JOIN T5 ON a4 = a5
+ """.stripMargin
+ // T3, T4, T5 can reorder
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testInnerAndFullOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | JOIN T2 ON a1 = a2
+ | FULL OUTER JOIN T3 ON a2 = a3
+ | JOIN T4 ON a1 = a4
+ | JOIN T5 ON a4 = a5
+ """.stripMargin
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testAllLeftOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | LEFT OUTER JOIN T2 ON a1 = a2
+ | LEFT OUTER JOIN T3 ON a2 = a3
+ | LEFT OUTER JOIN T4 ON a1 = a4
+ | LEFT OUTER JOIN T5 ON a4 = a5
+ """.stripMargin
+ // can not reorder
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testAllRightOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | RIGHT OUTER JOIN T2 ON a1 = a2
+ | RIGHT OUTER JOIN T3 ON a2 = a3
+ | RIGHT OUTER JOIN T4 ON a1 = a4
+ | RIGHT OUTER JOIN T5 ON a4 = a5
+ """.stripMargin
+ // can not reorder
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testAllFullOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | FULL OUTER JOIN T2 ON a1 = a2
+ | FULL OUTER JOIN T3 ON a1 = a3
+ | FULL OUTER JOIN T4 ON a1 = a4
+ | FULL OUTER JOIN T5 ON a4 = a5
+ """.stripMargin
+ // can not reorder
+ util.verifyPlan(sql)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala
index ea096e3..6d3b3b6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala
@@ -50,7 +50,7 @@ class FlinkAggregateInnerJoinTransposeRuleTest extends TableTestBase {
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(RuleSets.ofList(AggregateReduceGroupingRule.INSTANCE
- )).build(), "reduce unless grouping")
+ )).build(), "reduce useless grouping")
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala
index 3ad9faa..1fa6622 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
import org.apache.flink.table.util.TableTestBase
import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
import org.junit.{Before, Test}
/**
@@ -38,11 +39,16 @@ class FlinkJoinPushExpressionsRuleTest extends TableTestBase {
def setup(): Unit = {
val programs = new FlinkChainedProgram[BatchOptimizeContext]()
programs.addLast(
- "FilterSimplifyExpressions",
+ "rules",
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkBatchRuleSets.SEMI_JOIN_RULES)
+ .add(RuleSets.ofList(
+ SimplifyFilterConditionRule.EXTENDED,
+ FlinkRewriteSubQueryRule.FILTER,
+ FlinkSubQueryRemoveRule.FILTER,
+ JoinConditionTypeCoerceRule.INSTANCE,
+ FlinkJoinPushExpressionsRule.INSTANCE))
.build()
)
val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala
new file mode 100644
index 0000000..156c2ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkGroupProgramBuilder, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.{FilterMultiJoinMergeRule, JoinToMultiJoinRule, ProjectMultiJoinMergeRule}
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[RewriteMultiJoinConditionRule]].
+ */
+class RewriteMultiJoinConditionRuleTest extends TableTestBase {
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val program = new FlinkChainedProgram[BatchOptimizeContext]()
+ program.addLast(
+ "rules",
+ FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+ .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+ FlinkFilterJoinRule.FILTER_ON_JOIN,
+ FlinkFilterJoinRule.JOIN))
+ .build(), "push filter into join")
+ .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+ ProjectMultiJoinMergeRule.INSTANCE,
+ FilterMultiJoinMergeRule.INSTANCE,
+ JoinToMultiJoinRule.INSTANCE))
+ .build(), "merge join to MultiJoin")
+ .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(RewriteMultiJoinConditionRule.INSTANCE))
+ .build(), "RewriteMultiJoinConditionRule")
+ .build())
+
+ val builder = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(program)
+ util.tableEnv.config.setCalciteConfig(builder.build())
+
+ util.addTableSource[(Int, Long)]("A", 'a1, 'a2)
+ util.addTableSource[(Int, Long)]("B", 'b1, 'b2)
+ util.addTableSource[(Int, Long)]("C", 'c1, 'c2)
+ util.addTableSource[(Int, Long)]("D", 'd1, 'd2)
+ }
+
+ @Test
+ def testMultiJoin_InnerJoin1(): Unit = {
+ val sqlQuery = "SELECT * FROM A, B WHERE a1 = b1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_InnerJoin2(): Unit = {
+ val sqlQuery = "SELECT * FROM A, B, C WHERE a1 = b1 AND a1 = c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_InnerJoin3(): Unit = {
+ val sqlQuery = "SELECT * FROM A, B, C, D WHERE a1 = b1 AND b1 = c1 AND c1 = d1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_InnerJoin4(): Unit = {
+ // non-equi join condition
+ val sqlQuery = "SELECT * FROM A, B, C WHERE a1 = b1 AND a1 > c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_InnerJoin5(): Unit = {
+ val sqlQuery = "SELECT * FROM A, B, C WHERE a1 + 1 = b1 AND a1 + 1 = c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_LeftJoin1(): Unit = {
+ val sqlQuery = "SELECT * FROM A LEFT JOIN B ON a1 = b1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_LeftJoin2(): Unit = {
+ val sqlQuery = "SELECT * FROM A JOIN B ON a1 = b1 LEFT JOIN C ON b1 = c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_LeftJoin3(): Unit = {
+ val sqlQuery = "SELECT * FROM A LEFT JOIN B ON a1 = b1 JOIN C ON a1 = c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_RightJoin1(): Unit = {
+ val sqlQuery = "SELECT * FROM A RIGHT JOIN B ON a1 = b1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_RightJoin2(): Unit = {
+ val sqlQuery = "SELECT * FROM A JOIN B ON a1 = b1 RIGHT JOIN C ON b1 = c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_RightJoin3(): Unit = {
+ val sqlQuery = "SELECT * FROM A RIGHT JOIN B ON a1 = b1 JOIN C ON a1 = c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_FullJoin1(): Unit = {
+ val sqlQuery = "SELECT * FROM A FULL OUTER JOIN B ON a1 = b1"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoin_FullJoin2(): Unit = {
+ val sqlQuery = "SELECT * FROM A FULL OUTER JOIN B ON a1 = b1 FULL OUTER JOIN C ON a1 = c1"
+ util.verifyPlan(sqlQuery)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala
new file mode 100644
index 0000000..2a34791
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.stream.sql.join
+
+import org.apache.flink.table.plan.common.JoinReorderTestBase
+import org.apache.flink.table.util.TableTestUtil
+
+class JoinReorderTest extends JoinReorderTestBase {
+ override protected def getTableTestUtil: TableTestUtil = streamTestUtil()
+}