You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/02 09:10:28 UTC

[flink] branch master updated: [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a6d72fe  [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner
a6d72fe is described below

commit a6d72fed708e99404c96ffeca6ad70e08963e9cd
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jun 22 12:50:06 2019 +0800

    [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner
    
    This closes #8832
---
 .../flink/table/api/PlannerConfigOptions.java      |   6 +
 .../plan/optimize/program/FlinkBatchProgram.scala  |  22 +-
 .../plan/optimize/program/FlinkStreamProgram.scala |  23 +-
 .../table/plan/rules/FlinkBatchRuleSets.scala      |  16 +
 .../table/plan/rules/FlinkStreamRuleSets.scala     |  16 +
 .../logical/RewriteMultiJoinConditionRule.scala    | 129 +++++
 .../table/plan/batch/sql/join/JoinReorderTest.xml  | 600 +++++++++++++++++++
 .../logical/RewriteMultiJoinConditionRuleTest.xml  | 318 +++++++++++
 .../table/plan/stream/sql/join/JoinReorderTest.xml | 635 +++++++++++++++++++++
 .../plan/batch/sql/join/JoinReorderTest.scala      |  25 +
 .../table/plan/common/JoinReorderTestBase.scala    | 233 ++++++++
 .../FlinkAggregateInnerJoinTransposeRuleTest.scala |   2 +-
 .../logical/FlinkJoinPushExpressionsRuleTest.scala |  10 +-
 .../RewriteMultiJoinConditionRuleTest.scala        | 153 +++++
 .../plan/stream/sql/join/JoinReorderTest.scala     |  25 +
 15 files changed, 2208 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
index 7b05df1..5e7a1a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
@@ -137,4 +137,10 @@ public class PlannerConfigOptions {
 					.defaultValue(true)
 					.withDescription("Allow trying to push predicate down to a FilterableTableSource. " +
 						"the default value is true, means allow the attempt.");
+
+	public static final ConfigOption<Boolean> SQL_OPTIMIZER_JOIN_REORDER_ENABLED =
+			key("sql.optimizer.join-reorder.enabled")
+					.defaultValue(false)
+					.withDescription("Enables join reorder in optimizer cbo. Default is disabled.");
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 090b4fb..e73fde6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.plan.optimize.program
 
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.PlannerConfigOptions
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
 
@@ -33,6 +34,7 @@ object FlinkBatchProgram {
   val DECORRELATE = "decorrelate"
   val DEFAULT_REWRITE = "default_rewrite"
   val PREDICATE_PUSHDOWN = "predicate_pushdown"
+  val JOIN_REORDER = "join_reorder"
   val JOIN_REWRITE = "join_rewrite"
   val WINDOW = "window"
   val LOGICAL = "logical"
@@ -120,7 +122,7 @@ object FlinkBatchProgram {
                 .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
                 .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
                 .build(), "other predicate rewrite")
-            .setIterations(5).build())
+            .setIterations(5).build(), "predicate rewrite")
         .addProgram(
           FlinkHepRuleSetProgramBuilder.newBuilder
             .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
@@ -135,6 +137,24 @@ object FlinkBatchProgram {
             .build(), "prune empty after predicate push down")
         .build())
 
+    // join reorder
+    if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) {
+      chainedProgram.addLast(
+        JOIN_REORDER,
+        FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+          .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+            .add(FlinkBatchRuleSets.JOIN_REORDER_PERPARE_RULES)
+            .build(), "merge join into MultiJoin")
+          .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+            .add(FlinkBatchRuleSets.JOIN_REORDER_RULES)
+            .build(), "do join reorder")
+          .build())
+    }
+
     // join rewrite
     chainedProgram.addLast(
       JOIN_REWRITE,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index a5f1dac..b64ff31 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.plan.optimize.program
 
-import org.apache.calcite.plan.hep.HepMatchOrder
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.PlannerConfigOptions
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.{FlinkBatchRuleSets, FlinkStreamRuleSets}
 
+import org.apache.calcite.plan.hep.HepMatchOrder
+
 /**
   * Defines a sequence of programs to optimize for stream table plan.
   */
@@ -34,6 +36,7 @@ object FlinkStreamProgram {
   val TIME_INDICATOR = "time_indicator"
   val DEFAULT_REWRITE = "default_rewrite"
   val PREDICATE_PUSHDOWN = "predicate_pushdown"
+  val JOIN_REORDER = "join_reorder"
   val LOGICAL = "logical"
   val LOGICAL_REWRITE = "logical_rewrite"
   val PHYSICAL = "physical"
@@ -130,6 +133,24 @@ object FlinkStreamProgram {
             .build(), "prune empty after predicate push down")
         .build())
 
+    // join reorder
+    if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) {
+      chainedProgram.addLast(
+        JOIN_REORDER,
+        FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
+          .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+            .add(FlinkStreamRuleSets.JOIN_REORDER_PERPARE_RULES)
+            .build(), "merge join into MultiJoin")
+          .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+            .add(FlinkStreamRuleSets.JOIN_REORDER_RULES)
+            .build(), "do join reorder")
+          .build())
+    }
+
     // optimize the logical plan
     chainedProgram.addLast(
       LOGICAL,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 75f1d5a..71fcb23 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -218,6 +218,22 @@ object FlinkBatchRuleSets {
       FILTER_RULES.asScala
     ).asJava)
 
+  val JOIN_REORDER_PERPARE_RULES: RuleSet = RuleSets.ofList(
+    // merge join to MultiJoin
+    JoinToMultiJoinRule.INSTANCE,
+    // merge project to MultiJoin
+    ProjectMultiJoinMergeRule.INSTANCE,
+    // merge filter to MultiJoin
+    FilterMultiJoinMergeRule.INSTANCE
+  )
+
+  val JOIN_REORDER_RULES: RuleSet = RuleSets.ofList(
+    // equi-join predicates transfer
+    RewriteMultiJoinConditionRule.INSTANCE,
+    // join reorder
+    LoptOptimizeJoinRule.INSTANCE
+  )
+
   /**
     * RuleSet to do logical optimize.
     * This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 0f8b219..ca94b8e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -201,6 +201,22 @@ object FlinkStreamRuleSets {
     ProjectSetOpTransposeRule.INSTANCE
   )
 
+  val JOIN_REORDER_PERPARE_RULES: RuleSet = RuleSets.ofList(
+    // merge project to MultiJoin
+    ProjectMultiJoinMergeRule.INSTANCE,
+    // merge filter to MultiJoin
+    FilterMultiJoinMergeRule.INSTANCE,
+    // merge join to MultiJoin
+    JoinToMultiJoinRule.INSTANCE
+  )
+
+  val JOIN_REORDER_RULES: RuleSet = RuleSets.ofList(
+    // equi-join predicates transfer
+    RewriteMultiJoinConditionRule.INSTANCE,
+    // join reorder
+    LoptOptimizeJoinRule.INSTANCE
+  )
+
   /**
     * RuleSet to do logical optimize.
     * This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala
new file mode 100644
index 0000000..392e58b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.rules.MultiJoin
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Planner rule to apply transitive closure on [[MultiJoin]] for equi-join predicates.
+  *
+  * <p>e.g.
+  * MJ(A, B, C) ON A.a1=B.b1 AND B.b1=C.c1 &rarr;
+  * MJ(A, B, C) ON A.a1=B.b1 AND B.b1=C.c1 AND A.a1=C.c1
+  *
+  * The advantage of applying this rule is that it increases the choice of join reorder;
+  * at the same time, the disadvantage is that it will use more CPU for additional join predicates.
+  */
+class RewriteMultiJoinConditionRule extends RelOptRule(
+  operand(classOf[MultiJoin], any),
+  "RewriteMultiJoinConditionRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val multiJoin: MultiJoin = call.rel(0)
+    // currently only supports all join types are INNER join
+    val isAllInnerJoin = multiJoin.getJoinTypes.forall(_ eq JoinRelType.INNER)
+    val (equiJoinFilters, _) = partitionJoinFilters(multiJoin)
+    !multiJoin.isFullOuterJoin && isAllInnerJoin && equiJoinFilters.size > 1
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val multiJoin: MultiJoin = call.rel(0)
+    val (equiJoinFilters, nonEquiJoinFilters) = partitionJoinFilters(multiJoin)
+    // there is no `equals` method in RexCall, so the key of this map should be String
+    val equiJoinFilterMap = mutable.HashMap[String, mutable.ListBuffer[RexNode]]()
+    equiJoinFilters.foreach {
+      case c: RexCall =>
+        require(c.isA(SqlKind.EQUALS))
+        val left = c.operands.head
+        val right = c.operands(1)
+        equiJoinFilterMap.getOrElseUpdate(left.toString, mutable.ListBuffer[RexNode]()) += right
+        equiJoinFilterMap.getOrElseUpdate(right.toString, mutable.ListBuffer[RexNode]()) += left
+    }
+
+    val candidateJoinFilters = equiJoinFilterMap.values.filter(_.size > 1)
+    if (candidateJoinFilters.isEmpty) {
+      // no transitive closure predicates
+      return
+    }
+
+    val newEquiJoinFilters = mutable.ListBuffer[RexNode](equiJoinFilters: _*)
+    def containEquiJoinFilter(joinFilter: RexNode): Boolean = {
+      newEquiJoinFilters.exists { f => f.toString.equals(joinFilter.toString) }
+    }
+
+    val rexBuilder = multiJoin.getCluster.getRexBuilder
+    candidateJoinFilters.foreach {
+      candidate => candidate.indices.foreach {
+        startIndex =>
+          val op1 = candidate(startIndex)
+          candidate.subList(startIndex + 1, candidate.size).foreach {
+            op2 =>
+              // `a = b` and `b = a` are the same
+              val newFilter1 = rexBuilder.makeCall(EQUALS, op1, op2)
+              val newFilter2 = rexBuilder.makeCall(EQUALS, op2, op1)
+              if (!containEquiJoinFilter(newFilter1) && !containEquiJoinFilter(newFilter2)) {
+                newEquiJoinFilters += newFilter1
+              }
+          }
+      }
+    }
+
+    if (newEquiJoinFilters.size == equiJoinFilters.size) {
+      // no new join filters added
+      return
+    }
+
+    val newJoinFilter = call.builder().and(newEquiJoinFilters.toList ::: nonEquiJoinFilters.toList)
+    val newMultiJoin =
+      new MultiJoin(
+        multiJoin.getCluster,
+        multiJoin.getInputs,
+        newJoinFilter,
+        multiJoin.getRowType,
+        multiJoin.isFullOuterJoin,
+        multiJoin.getOuterJoinConditions,
+        multiJoin.getJoinTypes,
+        multiJoin.getProjFields,
+        multiJoin.getJoinFieldRefCountsMap,
+        multiJoin.getPostJoinFilter)
+
+    call.transformTo(newMultiJoin)
+  }
+
+  /**
+    * Partitions MultiJoin condition in equi join filters and non-equi join filters.
+    */
+  private def partitionJoinFilters(multiJoin: MultiJoin): (Seq[RexNode], Seq[RexNode]) = {
+    val joinFilters = RelOptUtil.conjunctions(multiJoin.getJoinFilter)
+    joinFilters.partition(f => f.isA(SqlKind.EQUALS))
+  }
+
+}
+
+object RewriteMultiJoinConditionRule {
+  val INSTANCE = new RewriteMultiJoinConditionRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml
new file mode 100644
index 0000000..8fe210d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml
@@ -0,0 +1,600 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testAllFullOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   FULL OUTER JOIN T2 ON a1 = a2
+   FULL OUTER JOIN T3 ON a1 = a3
+   FULL OUTER JOIN T4 ON a1 = a4
+   FULL OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[full])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[full])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[full])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[FullOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right])
+:- Exchange(distribution=[hash[a4]])
+:  +- HashJoin(joinType=[FullOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], build=[right])
+:     :- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+:     :  :- Exchange(distribution=[hash[a1]])
+:     :  :  +- HashJoin(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])
+:     :  :     :- Exchange(distribution=[hash[a1]])
+:     :  :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+:     :  :     +- Exchange(distribution=[hash[a2]])
+:     :  :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+:     :  +- Exchange(distribution=[single])
+:     :     +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+:     +- Exchange(distribution=[hash[a4]])
+:        +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAllLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   LEFT OUTER JOIN T2 ON a1 = a2
+   LEFT OUTER JOIN T3 ON a2 = a3
+   LEFT OUTER JOIN T4 ON a1 = a4
+   LEFT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[left])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right])
+:- Exchange(distribution=[hash[a4]])
+:  +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+:     :- HashJoin(joinType=[LeftOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+:     :  :- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])
+:     :  :  :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+:     :  :  +- Exchange(distribution=[broadcast])
+:     :  :     +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+:     :  +- Exchange(distribution=[broadcast])
+:     :     +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+:     +- Exchange(distribution=[broadcast])
+:        +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAllRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   RIGHT OUTER JOIN T2 ON a1 = a2
+   RIGHT OUTER JOIN T3 ON a2 = a3
+   RIGHT OUTER JOIN T4 ON a1 = a4
+   RIGHT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[right])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[broadcast])
+      +- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[left])
+         :- Exchange(distribution=[broadcast])
+         :  +- SortMergeJoin(joinType=[RightOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3])
+         :     :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])
+         :     :  :- Exchange(distribution=[hash[a1]])
+         :     :  :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :     :  +- Exchange(distribution=[hash[a2]])
+         :     :     +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         :     +- Exchange(distribution=[hash[a3]])
+         :        +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+         +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testBushyJoinCondition1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($3, $6), =($0, $9), =($6, $12))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+   :  :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :  +- Exchange(distribution=[broadcast])
+   :     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+   :        :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        +- Exchange(distribution=[broadcast])
+   :           +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)])
+   :              +- HashJoin(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+   :                 :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :                 +- Exchange(distribution=[broadcast])
+   :                    +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[broadcast])
+      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testBushyJoinCondition2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($4, $7), =($1, $10), =($7, $13))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
+   :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right])
+   :  :- Exchange(distribution=[hash[b1]])
+   :  :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :  +- Exchange(distribution=[hash[b4]])
+   :     +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])
+   :        :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)])
+   :        :  +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
+   :        :     :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        :     +- Exchange(distribution=[broadcast])
+   :        :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :        +- Exchange(distribution=[broadcast])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   +- Exchange(distribution=[broadcast])
+      +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerAndFullOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   FULL OUTER JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[full])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+   :  :- Exchange(distribution=[single])
+   :  :  +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])
+   :  :     :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :  :     +- Exchange(distribution=[broadcast])
+   :  :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :  +- Exchange(distribution=[single])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[broadcast])
+      +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)])
+         +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+            :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+            +- Exchange(distribution=[broadcast])
+               +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerAndLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   LEFT OUTER JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[broadcast])
+      +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+         :- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+         :  :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :  +- Exchange(distribution=[broadcast])
+         :     +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+         :        :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         :        +- Exchange(distribution=[broadcast])
+         :           +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+         +- Exchange(distribution=[broadcast])
+            +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerAndRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   RIGHT OUTER JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], build=[right])
+   :- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+   :  :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])
+   :  :  :- Exchange(distribution=[hash[a1]])
+   :  :  :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :  :  +- Exchange(distribution=[hash[a2]])
+   :  :     +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :  +- Exchange(distribution=[broadcast])
+   :     +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)])
+   :        +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+   :           :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :           +- Exchange(distribution=[broadcast])
+   :              +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   +- Exchange(distribution=[broadcast])
+      +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithFilter">
+    <Resource name="sql">
+      <![CDATA[
+WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10),
+     V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000),
+     V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100)
+
+SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $12), <($13, 15))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11])
+      :  +- LogicalFilter(condition=[>(+($7, $1), 100)])
+      :     +- LogicalJoin(condition=[=($9, $0)], joinType=[inner])
+      :        :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      :        +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8])
+      :           +- LogicalFilter(condition=[<(*($1, $7), 2000)])
+      :              +- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+      :                 :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5])
+      :                 :  +- LogicalFilter(condition=[>(*($1, $4), 10)])
+      :                 :     +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :                 :        :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :                 :        +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :                 +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[AND(>(+(b2, b4), 100), =(a2, a4))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- NestedLoopJoin(joinType=[InnerJoin], where=[AND(>(*(b1, b2), 10), =(a2, a3))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+   :  :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :  +- Exchange(distribution=[broadcast])
+   :     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+   :        :- Calc(select=[a5, b5, c5], where=[<(b5, 15)])
+   :        :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        +- Exchange(distribution=[broadcast])
+   :           +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)])
+   :              +- HashJoin(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+   :                 :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :                 +- Exchange(distribution=[broadcast])
+   :                    +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[broadcast])
+      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithProject">
+    <Resource name="sql">
+      <![CDATA[
+WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2),
+     V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3),
+     V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4)
+
+SELECT * FROM V3, T5 where a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a3=[$0], b1=[$1], a1=[$2], c2=[$3], c3=[$4], a4=[$5], b4=[$6], a5=[$7], b5=[$8], c5=[$9])
++- LogicalFilter(condition=[=($5, $7)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalProject(a3=[$3], b1=[$4], a1=[$5], c2=[$6], c3=[$7], a4=[$0], b4=[$1])
+      :  +- LogicalJoin(condition=[=($5, $0)], joinType=[inner])
+      :     :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      :     +- LogicalProject(a3=[$4], b1=[$0], a1=[$1], c2=[$3], c3=[$6])
+      :        +- LogicalJoin(condition=[=($2, $4)], joinType=[inner])
+      :           :- LogicalProject(b1=[$1], a1=[$0], a2=[$3], c2=[$5])
+      :           :  +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :           :     :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :           :     +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :           +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a3, b1, a1, c2, c3, a4, b4], isBroadcast=[true], build=[right])
+   :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[broadcast])
+      +- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a3, b1, a1, c2, c3, a4, b4], isBroadcast=[true], build=[right])
+         :- Calc(select=[a3, b1, a1, c2, c3])
+         :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[b1, a1, a2, c2, a3, c3], isBroadcast=[true], build=[right])
+         :     :- Calc(select=[b1, a1, a2, c2])
+         :     :  +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, c2], isBroadcast=[true], build=[right])
+         :     :     :- Calc(select=[a1, b1])
+         :     :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :     :     +- Exchange(distribution=[broadcast])
+         :     :        +- Calc(select=[a2, c2])
+         :     :           +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         :     +- Exchange(distribution=[broadcast])
+         :        +- Calc(select=[a3, c3])
+         :           +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+         +- Exchange(distribution=[broadcast])
+            +- Calc(select=[a4, b4])
+               +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testStarJoinCondition1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($0, $6), =($0, $9), =($0, $12))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+   :  :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :  +- Exchange(distribution=[broadcast])
+   :     +- HashJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+   :        :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        +- Exchange(distribution=[broadcast])
+   :           +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+   :              :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :              +- Exchange(distribution=[broadcast])
+   :                 +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[broadcast])
+      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testStarJoinCondition2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($1, $7), =($1, $10), =($1, $13))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
+   :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right])
+   :  :- Exchange(distribution=[hash[b1]])
+   :  :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :  +- Exchange(distribution=[hash[b4]])
+   :     +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])
+   :        :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)])
+   :        :  +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
+   :        :     :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        :     +- Exchange(distribution=[broadcast])
+   :        :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :        +- Exchange(distribution=[broadcast])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   +- Exchange(distribution=[broadcast])
+      +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithoutColumnStats">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($2, $5), =($2, $8), =($5, $11), =($2, $14))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[=(c1, c2)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   +- Exchange(distribution=[broadcast])
+      +- HashJoin(joinType=[InnerJoin], where=[=(c2, c5)], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+         :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         +- Exchange(distribution=[broadcast])
+            +- HashJoin(joinType=[InnerJoin], where=[=(c3, c4)], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+               :- Calc(select=[a5, b5, c5, a3, b3, c3], where=[=(c5, c3)])
+               :  +- HashJoin(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], isBroadcast=[true], build=[right])
+               :     :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+               :     +- Exchange(distribution=[broadcast])
+               :        +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+               +- Exchange(distribution=[broadcast])
+                  +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml
new file mode 100644
index 0000000..29dd20a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml
@@ -0,0 +1,318 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testMultiJoin_FullJoin1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A FULL OUTER JOIN B ON a1 = b1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+   :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_FullJoin2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A FULL OUTER JOIN B ON a1 = b1 FULL OUTER JOIN C ON a1 = c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[full])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_InnerJoin1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A, B WHERE a1 = b1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalFilter(condition=[=($0, $2)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_InnerJoin2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A, B, C WHERE a1 = b1 AND a1 = c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($0, $4))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[AND(=($0, $4), =($0, $2), =($4, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_InnerJoin3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A, B, C, D WHERE a1 = b1 AND b1 = c1 AND c1 = d1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5], d1=[$6], d2=[$7])
++- LogicalFilter(condition=[AND(=($0, $2), =($2, $4), =($4, $6))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, D, source: [TestTableSource(d1, d2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[AND(=($4, $6), =($2, $4), =($0, $2), =($6, $2), =($4, $0), =($6, $0))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, D, source: [TestTableSource(d1, d2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_InnerJoin4">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A, B, C WHERE a1 = b1 AND a1 > c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), >($0, $4))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[AND(>($0, $4), =($0, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_InnerJoin5">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A, B, C WHERE a1 + 1 = b1 AND a1 + 1 = c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalFilter(condition=[AND(=(+($0, 1), $2), =(+($0, 1), $4))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[AND(=(+($0, 1), $4), =(+($0, 1), $2), =($4, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_LeftJoin1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A LEFT JOIN B ON a1 = b1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_LeftJoin2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A JOIN B ON a1 = b1 LEFT JOIN C ON b1 = c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($2, $4)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($2, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_LeftJoin3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A LEFT JOIN B ON a1 = b1 JOIN C ON a1 = c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_RightJoin1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A RIGHT JOIN B ON a1 = b1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_RightJoin2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A JOIN B ON a1 = b1 RIGHT JOIN C ON b1 = c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($2, $4)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($2, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoin_RightJoin3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM A RIGHT JOIN B ON a1 = b1 JOIN C ON a1 = c1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml
new file mode 100644
index 0000000..b05c7a0
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml
@@ -0,0 +1,635 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testAllFullOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   FULL OUTER JOIN T2 ON a1 = a2
+   FULL OUTER JOIN T3 ON a1 = a3
+   FULL OUTER JOIN T4 ON a1 = a4
+   FULL OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[full])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[full])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[full])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Join(joinType=[FullOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[hash[a4]])
+:  +- Join(joinType=[FullOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :- Exchange(distribution=[hash[a1]])
+:     :  +- Join(joinType=[FullOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :     :- Exchange(distribution=[hash[a1]])
+:     :     :  +- Join(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :     :     :- Exchange(distribution=[hash[a1]])
+:     :     :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+:     :     :     +- Exchange(distribution=[hash[a2]])
+:     :     :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+:     :     +- Exchange(distribution=[hash[a3]])
+:     :        +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+:     +- Exchange(distribution=[hash[a4]])
+:        +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAllLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   LEFT OUTER JOIN T2 ON a1 = a2
+   LEFT OUTER JOIN T3 ON a2 = a3
+   LEFT OUTER JOIN T4 ON a1 = a4
+   LEFT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[left])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Join(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[hash[a4]])
+:  +- Join(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :- Exchange(distribution=[hash[a1]])
+:     :  +- Join(joinType=[LeftOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :     :- Exchange(distribution=[hash[a2]])
+:     :     :  +- Join(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :     :     :- Exchange(distribution=[hash[a1]])
+:     :     :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+:     :     :     +- Exchange(distribution=[hash[a2]])
+:     :     :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+:     :     +- Exchange(distribution=[hash[a3]])
+:     :        +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+:     +- Exchange(distribution=[hash[a4]])
+:        +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
++- Exchange(distribution=[hash[a5]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAllRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   RIGHT OUTER JOIN T2 ON a1 = a2
+   RIGHT OUTER JOIN T3 ON a2 = a3
+   RIGHT OUTER JOIN T4 ON a1 = a4
+   RIGHT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[right])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Join(joinType=[RightOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[a1]])
+         :  +- Join(joinType=[RightOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :     :- Exchange(distribution=[hash[a2]])
+         :     :  +- Join(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :     :     :- Exchange(distribution=[hash[a1]])
+         :     :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :     :     +- Exchange(distribution=[hash[a2]])
+         :     :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         :     +- Exchange(distribution=[hash[a3]])
+         :        +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+         +- Exchange(distribution=[hash[a4]])
+            +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testBushyJoinCondition1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($3, $6), =($0, $9), =($6, $12))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a4, a2), =(a3, a4), =(a5, a4))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2, a3, a5]])
+   :  +- Join(joinType=[InnerJoin], where=[AND(=(a2, a3), =(a1, a2), =(a5, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[a2, a2, a2]])
+   :     :  +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :     +- Exchange(distribution=[hash[a3, a1, a5]])
+   :        +- Join(joinType=[InnerJoin], where=[AND(=(a3, a5), =(a1, a5))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[a5, a5]])
+   :           :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :           +- Exchange(distribution=[hash[a3, a1]])
+   :              +- Join(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :                 :- Exchange(distribution=[hash[a1]])
+   :                 :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :                 +- Exchange(distribution=[hash[a3]])
+   :                    +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[hash[a4, a4, a4, a4]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testBushyJoinCondition2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($4, $7), =($1, $10), =($7, $13))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(b3, b5), =(b2, b3), =(b3, b1), =(b4, b3))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[b5, b2, b1, b4]])
+   :  +- Join(joinType=[InnerJoin], where=[AND(=(b1, b4), =(b1, b2), =(b5, b1))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[b1, b1, b1]])
+   :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :     +- Exchange(distribution=[hash[b4, b2, b5]])
+   :        +- Join(joinType=[InnerJoin], where=[AND(=(b4, b2), =(b5, b4))], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[b2, b5]])
+   :           :  +- Join(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :     :- Exchange(distribution=[hash[b5]])
+   :           :     :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :           :     +- Exchange(distribution=[hash[b2]])
+   :           :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :           +- Exchange(distribution=[hash[b4, b4]])
+   :              +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   +- Exchange(distribution=[hash[b3, b3, b3, b3]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerAndFullOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   FULL OUTER JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[full])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a1))], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a1]])
+   :  +- Join(joinType=[FullOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[a2]])
+   :     :  +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :     :- Exchange(distribution=[hash[a1]])
+   :     :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :     :     +- Exchange(distribution=[hash[a2]])
+   :     :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :     +- Exchange(distribution=[hash[a3]])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[hash[a4, a5]])
+      +- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[a5]])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+         +- Exchange(distribution=[hash[a4]])
+            +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerAndLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   LEFT OUTER JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Join(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[a1]])
+         :  +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :     :- Exchange(distribution=[hash[a1]])
+         :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :     +- Exchange(distribution=[hash[a2]])
+         :        +- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :           :- Exchange(distribution=[hash[a2]])
+         :           :  +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         :           +- Exchange(distribution=[hash[a3]])
+         :              +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+         +- Exchange(distribution=[hash[a4]])
+            +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerAndRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   RIGHT OUTER JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a2]])
+   :  +- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a1))], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[a1, a1]])
+   :     :  +- Join(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :     :- Exchange(distribution=[hash[a1]])
+   :     :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :     :     +- Exchange(distribution=[hash[a2]])
+   :     :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :     +- Exchange(distribution=[hash[a4, a5]])
+   :        +- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[a5]])
+   :           :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :           +- Exchange(distribution=[hash[a4]])
+   :              +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   +- Exchange(distribution=[hash[a3]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithFilter">
+    <Resource name="sql">
+      <![CDATA[
+WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10),
+     V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000),
+     V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100)
+
+SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $12), <($13, 15))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11])
+      :  +- LogicalFilter(condition=[>(+($7, $1), 100)])
+      :     +- LogicalJoin(condition=[=($9, $0)], joinType=[inner])
+      :        :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      :        +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8])
+      :           +- LogicalFilter(condition=[<(*($1, $7), 2000)])
+      :              +- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+      :                 :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5])
+      :                 :  +- LogicalFilter(condition=[>(*($1, $4), 10)])
+      :                 :     +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :                 :        :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :                 :        +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :                 +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a4, a5), =(a3, a4), =(a4, a2), =(a1, a4), >(+(b2, b4), 100))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5, a3, a2, a1]])
+   :  +- Join(joinType=[InnerJoin], where=[AND(=(a2, a3), =(a1, a2), =(a2, a5), >(*(b1, b2), 10))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[a2, a2, a2]])
+   :     :  +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :     +- Exchange(distribution=[hash[a3, a1, a5]])
+   :        +- Join(joinType=[InnerJoin], where=[AND(=(a5, a3), =(a1, a5))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[a5, a5]])
+   :           :  +- Calc(select=[a5, b5, c5], where=[<(b5, 15)])
+   :           :     +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :           +- Exchange(distribution=[hash[a3, a1]])
+   :              +- Join(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :                 :- Exchange(distribution=[hash[a1]])
+   :                 :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :                 +- Exchange(distribution=[hash[a3]])
+   :                    +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[hash[a4, a4, a4, a4]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithProject">
+    <Resource name="sql">
+      <![CDATA[
+WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2),
+     V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3),
+     V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4)
+
+SELECT * FROM V3, T5 where a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a3=[$0], b1=[$1], a1=[$2], c2=[$3], c3=[$4], a4=[$5], b4=[$6], a5=[$7], b5=[$8], c5=[$9])
++- LogicalFilter(condition=[=($5, $7)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalProject(a3=[$3], b1=[$4], a1=[$5], c2=[$6], c3=[$7], a4=[$0], b4=[$1])
+      :  +- LogicalJoin(condition=[=($5, $0)], joinType=[inner])
+      :     :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      :     +- LogicalProject(a3=[$4], b1=[$0], a1=[$1], c2=[$3], c3=[$6])
+      :        +- LogicalJoin(condition=[=($2, $4)], joinType=[inner])
+      :           :- LogicalProject(b1=[$1], a1=[$0], a2=[$3], c2=[$5])
+      :           :  +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :           :     :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :           :     +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :           +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a3, b1, a1, c2, c3, a4, b4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Join(joinType=[InnerJoin], where=[=(a1, a4)], select=[a3, b1, a1, c2, c3, a4, b4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[a1]])
+         :  +- Calc(select=[a3, b1, a1, c2, c3])
+         :     +- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[b1, a1, a2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :        :- Exchange(distribution=[hash[a2]])
+         :        :  +- Calc(select=[b1, a1, a2, c2])
+         :        :     +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :        :        :- Exchange(distribution=[hash[a1]])
+         :        :        :  +- Calc(select=[a1, b1])
+         :        :        :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :        :        +- Exchange(distribution=[hash[a2]])
+         :        :           +- Calc(select=[a2, c2])
+         :        :              +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         :        +- Exchange(distribution=[hash[a3]])
+         :           +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+         +- Exchange(distribution=[hash[a4]])
+            +- Calc(select=[a4, b4])
+               +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testStarJoinCondition1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($0, $3), =($0, $6), =($0, $9), =($0, $12))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a4), =(a4, a3), =(a4, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a5, a3, a2]])
+   :  +- Join(joinType=[InnerJoin], where=[AND(=(a1, a2), =(a5, a2), =(a3, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[a2, a2, a2]])
+   :     :  +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :     +- Exchange(distribution=[hash[a1, a5, a3]])
+   :        +- Join(joinType=[InnerJoin], where=[AND(=(a1, a5), =(a5, a3))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[a5, a5]])
+   :           :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :           +- Exchange(distribution=[hash[a1, a3]])
+   :              +- Join(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :                 :- Exchange(distribution=[hash[a1]])
+   :                 :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :                 +- Exchange(distribution=[hash[a3]])
+   :                    +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   +- Exchange(distribution=[hash[a4, a4, a4, a4]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testStarJoinCondition2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($1, $4), =($1, $7), =($1, $10), =($1, $13))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(b1, b3), =(b5, b3), =(b4, b3), =(b3, b2))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[b1, b5, b4, b2]])
+   :  +- Join(joinType=[InnerJoin], where=[AND(=(b1, b5), =(b1, b4), =(b1, b2))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[b1, b1, b1]])
+   :     :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :     +- Exchange(distribution=[hash[b5, b4, b2]])
+   :        +- Join(joinType=[InnerJoin], where=[AND(=(b5, b4), =(b4, b2))], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[b5, b2]])
+   :           :  +- Join(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :     :- Exchange(distribution=[hash[b5]])
+   :           :     :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :           :     +- Exchange(distribution=[hash[b2]])
+   :           :        +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :           +- Exchange(distribution=[hash[b4, b4]])
+   :              +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   +- Exchange(distribution=[hash[b3, b3, b3, b3]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithoutColumnStats">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1, T2, T3, T4, T5
+WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5
+         ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalFilter(condition=[AND(=($2, $5), =($2, $8), =($5, $11), =($2, $14))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[AND(=(c1, c5), =(c1, c3), =(c1, c2), =(c4, c1))], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[c1, c1, c1, c1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   +- Exchange(distribution=[hash[c5, c3, c2, c4]])
+      +- Join(joinType=[InnerJoin], where=[AND(=(c2, c4), =(c5, c2), =(c3, c2))], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[c2, c2, c2]])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         +- Exchange(distribution=[hash[c4, c5, c3]])
+            +- Join(joinType=[InnerJoin], where=[AND(=(c4, c5), =(c4, c3))], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+               :- Exchange(distribution=[hash[c5, c3]])
+               :  +- Join(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+               :     :- Exchange(distribution=[hash[c5]])
+               :     :  +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+               :     +- Exchange(distribution=[hash[c3]])
+               :        +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+               +- Exchange(distribution=[hash[c4, c4]])
+                  +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala
new file mode 100644
index 0000000..cb31143
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.batch.sql.join
+
+import org.apache.flink.table.plan.common.JoinReorderTestBase
+import org.apache.flink.table.util.TableTestUtil
+
+class JoinReorderTest extends JoinReorderTestBase {
+  override protected def getTableTestUtil: TableTestUtil = batchTestUtil()
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala
new file mode 100644
index 0000000..88883a7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.common
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{PlannerConfigOptions, Types}
+import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats}
+import org.apache.flink.table.util.{TableTestBase, TableTestUtil}
+
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConversions._
+
+abstract class JoinReorderTestBase extends TableTestBase {
+
+  protected val util: TableTestUtil = getTableTestUtil
+
+  protected def getTableTestUtil: TableTestUtil
+
+  @Before
+  def setup(): Unit = {
+    val types = Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)
+
+    util.addTableSource("T1", types, Array("a1", "b1", "c1"), FlinkStatistic.builder()
+      .tableStats(new TableStats(1000000L, Map(
+        "a1" -> new ColumnStats(1000000L, 0L, 4.0, 4, null, null),
+        "b1" -> new ColumnStats(10L, 0L, 8.0, 8, null, null)
+      ))).build())
+
+    util.addTableSource("T2", types, Array("a2", "b2", "c2"), FlinkStatistic.builder()
+      .tableStats(new TableStats(10000L, Map(
+        "a2" -> new ColumnStats(100L, 0L, 4.0, 4, null, null),
+        "b2" -> new ColumnStats(5000L, 0L, 8.0, 8, null, null)
+      ))).build())
+
+    util.addTableSource("T3", types, Array("a3", "b3", "c3"), FlinkStatistic.builder()
+      .tableStats(new TableStats(10L, Map(
+        "a3" -> new ColumnStats(5L, 0L, 4.0, 4, null, null),
+        "b3" -> new ColumnStats(2L, 0L, 8.0, 8, null, null)
+      ))).build())
+
+    util.addTableSource("T4", types, Array("a4", "b4", "c4"), FlinkStatistic.builder()
+      .tableStats(new TableStats(100L, Map(
+        "a4" -> new ColumnStats(100L, 0L, 4.0, 4, null, null),
+        "b4" -> new ColumnStats(20L, 0L, 8.0, 8, null, null)
+      ))).build())
+
+    util.addTableSource("T5", types, Array("a5", "b5", "c5"), FlinkStatistic.builder()
+      .tableStats(new TableStats(500000L, Map(
+        "a5" -> new ColumnStats(200000L, 0L, 4.0, 4, null, null),
+        "b5" -> new ColumnStats(200L, 0L, 8.0, 8, null, null)
+      ))).build())
+
+    util.getTableEnv.getConfig.getConf.setBoolean(
+      PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED, true)
+  }
+
+  @Test
+  def testStarJoinCondition1(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1, T2, T3, T4, T5
+         |WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5
+         """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testStarJoinCondition2(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1, T2, T3, T4, T5
+         |WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5
+         """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testBushyJoinCondition1(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1, T2, T3, T4, T5
+         |WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5
+         """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testBushyJoinCondition2(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1, T2, T3, T4, T5
+         |WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5
+         """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testWithoutColumnStats(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1, T2, T3, T4, T5
+         |WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5
+         """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testJoinWithProject(): Unit = {
+    val sql =
+      s"""
+         |WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2),
+         |     V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3),
+         |     V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4)
+         |
+         |SELECT * FROM V3, T5 where a4 = a5
+         """.stripMargin
+    // can not reorder now
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+    val sql =
+      s"""
+         |WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10),
+         |     V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000),
+         |     V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100)
+         |
+         |SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15
+         """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testInnerAndLeftOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   JOIN T2 ON a1 = a2
+         |   JOIN T3 ON a2 = a3
+         |   LEFT OUTER JOIN T4 ON a1 = a4
+         |   JOIN T5 ON a4 = a5
+         """.stripMargin
+    // T1, T2, T3 can reorder
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testInnerAndRightOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   RIGHT OUTER JOIN T2 ON a1 = a2
+         |   JOIN T3 ON a2 = a3
+         |   JOIN T4 ON a1 = a4
+         |   JOIN T5 ON a4 = a5
+         """.stripMargin
+    // T3, T4, T5 can reorder
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testInnerAndFullOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   JOIN T2 ON a1 = a2
+         |   FULL OUTER JOIN T3 ON a2 = a3
+         |   JOIN T4 ON a1 = a4
+         |   JOIN T5 ON a4 = a5
+         """.stripMargin
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testAllLeftOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   LEFT OUTER JOIN T2 ON a1 = a2
+         |   LEFT OUTER JOIN T3 ON a2 = a3
+         |   LEFT OUTER JOIN T4 ON a1 = a4
+         |   LEFT OUTER JOIN T5 ON a4 = a5
+         """.stripMargin
+    // can not reorder
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testAllRightOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   RIGHT OUTER JOIN T2 ON a1 = a2
+         |   RIGHT OUTER JOIN T3 ON a2 = a3
+         |   RIGHT OUTER JOIN T4 ON a1 = a4
+         |   RIGHT OUTER JOIN T5 ON a4 = a5
+         """.stripMargin
+    // can not reorder
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testAllFullOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   FULL OUTER JOIN T2 ON a1 = a2
+         |   FULL OUTER JOIN T3 ON a1 = a3
+         |   FULL OUTER JOIN T4 ON a1 = a4
+         |   FULL OUTER JOIN T5 ON a4 = a5
+         """.stripMargin
+    // can not reorder
+    util.verifyPlan(sql)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala
index ea096e3..6d3b3b6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala
@@ -50,7 +50,7 @@ class FlinkAggregateInnerJoinTransposeRuleTest extends TableTestBase {
             .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
             .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
             .add(RuleSets.ofList(AggregateReduceGroupingRule.INSTANCE
-            )).build(), "reduce unless grouping")
+            )).build(), "reduce useless grouping")
         .addProgram(
           FlinkHepRuleSetProgramBuilder.newBuilder
             .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala
index 3ad9faa..1fa6622 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
 import org.apache.flink.table.util.TableTestBase
 
 import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
 import org.junit.{Before, Test}
 
 /**
@@ -38,11 +39,16 @@ class FlinkJoinPushExpressionsRuleTest extends TableTestBase {
   def setup(): Unit = {
     val programs = new FlinkChainedProgram[BatchOptimizeContext]()
     programs.addLast(
-      "FilterSimplifyExpressions",
+      "rules",
       FlinkHepRuleSetProgramBuilder.newBuilder
         .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
         .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
-        .add(FlinkBatchRuleSets.SEMI_JOIN_RULES)
+        .add(RuleSets.ofList(
+          SimplifyFilterConditionRule.EXTENDED,
+          FlinkRewriteSubQueryRule.FILTER,
+          FlinkSubQueryRemoveRule.FILTER,
+          JoinConditionTypeCoerceRule.INSTANCE,
+          FlinkJoinPushExpressionsRule.INSTANCE))
         .build()
     )
     val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala
new file mode 100644
index 0000000..156c2ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkGroupProgramBuilder, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.{FilterMultiJoinMergeRule, JoinToMultiJoinRule, ProjectMultiJoinMergeRule}
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[RewriteMultiJoinConditionRule]].
+  */
+class RewriteMultiJoinConditionRuleTest extends TableTestBase {
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val program = new FlinkChainedProgram[BatchOptimizeContext]()
+    program.addLast(
+      "rules",
+      FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+        .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+          .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+          .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+          .add(RuleSets.ofList(
+            FlinkFilterJoinRule.FILTER_ON_JOIN,
+            FlinkFilterJoinRule.JOIN))
+          .build(), "push filter into join")
+        .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+          .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+          .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+          .add(RuleSets.ofList(
+            ProjectMultiJoinMergeRule.INSTANCE,
+            FilterMultiJoinMergeRule.INSTANCE,
+            JoinToMultiJoinRule.INSTANCE))
+          .build(), "merge join to MultiJoin")
+        .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+          .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+          .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+          .add(RuleSets.ofList(RewriteMultiJoinConditionRule.INSTANCE))
+          .build(), "RewriteMultiJoinConditionRule")
+        .build())
+
+    val builder = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(program)
+    util.tableEnv.config.setCalciteConfig(builder.build())
+
+    util.addTableSource[(Int, Long)]("A", 'a1, 'a2)
+    util.addTableSource[(Int, Long)]("B", 'b1, 'b2)
+    util.addTableSource[(Int, Long)]("C", 'c1, 'c2)
+    util.addTableSource[(Int, Long)]("D", 'd1, 'd2)
+  }
+
+  @Test
+  def testMultiJoin_InnerJoin1(): Unit = {
+    val sqlQuery = "SELECT * FROM A, B WHERE a1 = b1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_InnerJoin2(): Unit = {
+    val sqlQuery = "SELECT * FROM A, B, C WHERE a1 = b1 AND a1 = c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_InnerJoin3(): Unit = {
+    val sqlQuery = "SELECT * FROM A, B, C, D WHERE a1 = b1 AND b1 = c1 AND c1 = d1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_InnerJoin4(): Unit = {
+    // non-equi join condition
+    val sqlQuery = "SELECT * FROM A, B, C WHERE a1 = b1 AND a1 > c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_InnerJoin5(): Unit = {
+    val sqlQuery = "SELECT * FROM A, B, C WHERE a1 + 1 = b1 AND a1 + 1 = c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_LeftJoin1(): Unit = {
+    val sqlQuery = "SELECT * FROM A LEFT JOIN B ON a1 = b1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_LeftJoin2(): Unit = {
+    val sqlQuery = "SELECT * FROM A JOIN B ON a1 = b1 LEFT JOIN C ON b1 = c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_LeftJoin3(): Unit = {
+    val sqlQuery = "SELECT * FROM A LEFT JOIN B ON a1 = b1 JOIN C ON a1 = c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_RightJoin1(): Unit = {
+    val sqlQuery = "SELECT * FROM A RIGHT JOIN B ON a1 = b1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_RightJoin2(): Unit = {
+    val sqlQuery = "SELECT * FROM A JOIN B ON a1 = b1 RIGHT JOIN C ON b1 = c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_RightJoin3(): Unit = {
+    val sqlQuery = "SELECT * FROM A RIGHT JOIN B ON a1 = b1 JOIN C ON a1 = c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_FullJoin1(): Unit = {
+    val sqlQuery = "SELECT * FROM A FULL OUTER JOIN B ON a1 = b1"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoin_FullJoin2(): Unit = {
+    val sqlQuery = "SELECT * FROM A FULL OUTER JOIN B ON a1 = b1 FULL OUTER JOIN C ON a1 = c1"
+    util.verifyPlan(sqlQuery)
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala
new file mode 100644
index 0000000..2a34791
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.stream.sql.join
+
+import org.apache.flink.table.plan.common.JoinReorderTestBase
+import org.apache.flink.table.util.TableTestUtil
+
+class JoinReorderTest extends JoinReorderTestBase {
+  override protected def getTableTestUtil: TableTestUtil = streamTestUtil()
+}