You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/12/27 06:38:53 UTC

[flink] branch release-1.16 updated: [FLINK-30270][table-planner] Fix FlinkJoinToMultiJoinRule incorrectly combines Left/Right outer join to MultiJoin error

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new ca42695dba5 [FLINK-30270][table-planner] Fix FlinkJoinToMultiJoinRule incorrectly combines Left/Right outer join to MultiJoin error
ca42695dba5 is described below

commit ca42695dba5ab72c6b9b895bb6553321c30d5074
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Tue Dec 27 14:21:24 2022 +0800

    [FLINK-30270][table-planner] Fix FlinkJoinToMultiJoinRule incorrectly combines Left/Right outer join to MultiJoin error
    
    This closes #21487
---
 .../rules/logical/FlinkJoinToMultiJoinRule.java    | 215 ++++++++++++--
 .../plan/batch/sql/join/JoinReorderTest.xml        |  53 ++--
 .../rules/logical/FlinkJoinToMultiJoinRuleTest.xml | 316 +++++++++++++--------
 .../logical/FlinkJoinToMultiJoinRuleTest.scala     |  91 ++++--
 4 files changed, 475 insertions(+), 200 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java
index 838062dc15c..0766c1d4699 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.table.planner.plan.rules.logical;
 
+import org.apache.flink.table.api.TableException;
+
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.rules.CoreRules;
@@ -134,20 +137,32 @@ public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.C
         final RelNode left = call.rel(1);
         final RelNode right = call.rel(2);
 
+        // inputNullGenFieldList records whether the field in originJoin is null generate field.
+        List<Boolean> inputNullGenFieldList = new ArrayList<>();
+        // Build null generate field list.
+        buildInputNullGenFieldList(left, right, origJoin.getJoinType(), inputNullGenFieldList);
+
         // Combine the children MultiJoin inputs into an array of inputs for the new MultiJoin.
         final List<ImmutableBitSet> projFieldsList = new ArrayList<>();
         final List<int[]> joinFieldRefCountsList = new ArrayList<>();
         final List<RelNode> newInputs =
-                combineInputs(origJoin, left, right, projFieldsList, joinFieldRefCountsList);
+                combineInputs(
+                        origJoin,
+                        left,
+                        right,
+                        projFieldsList,
+                        joinFieldRefCountsList,
+                        inputNullGenFieldList);
 
         // Combine the outer join information from the left and right inputs, and include the outer
         // join information from the current join, if it's a left/right outer join.
         final List<Pair<JoinRelType, RexNode>> joinSpecs = new ArrayList<>();
-        combineOuterJoins(origJoin, newInputs, left, right, joinSpecs);
+        combineOuterJoins(origJoin, newInputs, left, right, joinSpecs, inputNullGenFieldList);
 
         // Pull up the join filters from the children MultiJoinRels and combine them with the join
         // filter associated with this LogicalJoin to form the join filter for the new MultiJoin.
-        List<RexNode> newJoinFilters = combineJoinFilters(origJoin, left, right);
+        List<RexNode> newJoinFilters =
+                combineJoinFilters(origJoin, left, right, inputNullGenFieldList);
 
         // Add on the join field reference counts for the join condition associated with this
         // LogicalJoin.
@@ -178,6 +193,77 @@ public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.C
         call.transformTo(multiJoin);
     }
 
+    private void buildInputNullGenFieldList(
+            RelNode left, RelNode right, JoinRelType joinType, List<Boolean> isNullGenFieldList) {
+        if (joinType == JoinRelType.INNER) {
+            buildNullGenFieldList(left, isNullGenFieldList);
+            buildNullGenFieldList(right, isNullGenFieldList);
+        } else if (joinType == JoinRelType.LEFT) {
+            // If origin joinType is left means join fields from right side must be null generated
+            // fields, so we need only judge these join fields in left side and set null generate
+            // field is true for all right fields.
+            buildNullGenFieldList(left, isNullGenFieldList);
+
+            for (int i = 0; i < right.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+        } else if (joinType == JoinRelType.RIGHT) {
+            // If origin joinType is right means join fields from left side must be null generated
+            // fields, so we need only judge these join fields in right side and set null generate
+            // field is true for all left fields.
+            for (int i = 0; i < left.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+
+            buildNullGenFieldList(right, isNullGenFieldList);
+        } else if (joinType == JoinRelType.FULL) {
+            // For full outer join, both the left side and the right side must be null generated
+            // fields, so all join fields will be set as null generated field.
+            for (int i = 0; i < left.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+            for (int i = 0; i < right.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+        } else {
+            // Now, join to multi join rule only support Full outer join, Inner join and Left/Right
+            // join.
+            throw new TableException(
+                    "This is a bug. Now, join to multi join rule only support Full outer "
+                            + "join, Inner join and Left/Right join.");
+        }
+    }
+
+    private void buildNullGenFieldList(RelNode rel, List<Boolean> isNullGenFieldList) {
+        MultiJoin multiJoin = rel instanceof MultiJoin ? (MultiJoin) rel : null;
+        if (multiJoin == null) {
+            // other operators.
+            for (int i = 0; i < rel.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(false);
+            }
+        } else {
+            List<RelNode> inputs = multiJoin.getInputs();
+            List<JoinRelType> joinTypes = multiJoin.getJoinTypes();
+            for (int i = 0; i < inputs.size() - 1; i++) {
+                // In list joinTypes, right join node will be added as [RIGHT, INNER], so we need to
+                // get the joinType from joinTypes in index i.
+                if (joinTypes.get(i) == JoinRelType.RIGHT) {
+                    buildInputNullGenFieldList(
+                            inputs.get(i), inputs.get(i + 1), joinTypes.get(i), isNullGenFieldList);
+                } else {
+                    // In list joinTypes, left join node and inner join node will be added as
+                    // [INNER, LEFT] and [INNER, INNER] respectively. so we need to get the joinType
+                    // from joinTypes in index i + 1.
+                    buildInputNullGenFieldList(
+                            inputs.get(i),
+                            inputs.get(i + 1),
+                            joinTypes.get(i + 1),
+                            isNullGenFieldList);
+                }
+            }
+        }
+    }
+
     /**
      * Combines the inputs into a LogicalJoin into an array of inputs.
      *
@@ -193,28 +279,47 @@ public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.C
             RelNode left,
             RelNode right,
             List<ImmutableBitSet> projFieldsList,
-            List<int[]> joinFieldRefCountsList) {
+            List<int[]> joinFieldRefCountsList,
+            List<Boolean> inputNullGenFieldList) {
         final List<RelNode> newInputs = new ArrayList<>();
-
         // Leave the null generating sides of an outer join intact; don't pull up those children
         // inputs into the array we're constructing.
-        if (canCombine(left, join.getJoinType(), join.getJoinType().generatesNullsOnLeft())) {
+        JoinInfo joinInfo = join.analyzeCondition();
+        ImmutableIntList leftKeys = joinInfo.leftKeys;
+        ImmutableIntList rightKeys = joinInfo.rightKeys;
+
+        if (canCombine(
+                left,
+                leftKeys,
+                join.getJoinType(),
+                join.getJoinType().generatesNullsOnLeft(),
+                true,
+                inputNullGenFieldList,
+                0)) {
             final MultiJoin leftMultiJoin = (MultiJoin) left;
-            for (int i = 0; i < left.getInputs().size(); i++) {
+            for (int i = 0; i < leftMultiJoin.getInputs().size(); i++) {
                 newInputs.add(leftMultiJoin.getInput(i));
                 projFieldsList.add(leftMultiJoin.getProjFields().get(i));
                 joinFieldRefCountsList.add(
                         leftMultiJoin.getJoinFieldRefCountsMap().get(i).toIntArray());
             }
+
         } else {
             newInputs.add(left);
             projFieldsList.add(null);
             joinFieldRefCountsList.add(new int[left.getRowType().getFieldCount()]);
         }
 
-        if (canCombine(right, join.getJoinType(), join.getJoinType().generatesNullsOnRight())) {
+        if (canCombine(
+                right,
+                rightKeys,
+                join.getJoinType(),
+                join.getJoinType().generatesNullsOnRight(),
+                false,
+                inputNullGenFieldList,
+                left.getRowType().getFieldCount())) {
             final MultiJoin rightMultiJoin = (MultiJoin) right;
-            for (int i = 0; i < right.getInputs().size(); i++) {
+            for (int i = 0; i < rightMultiJoin.getInputs().size(); i++) {
                 newInputs.add(rightMultiJoin.getInput(i));
                 projFieldsList.add(rightMultiJoin.getProjFields().get(i));
                 joinFieldRefCountsList.add(
@@ -246,10 +351,30 @@ public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.C
             List<RelNode> combinedInputs,
             RelNode left,
             RelNode right,
-            List<Pair<JoinRelType, RexNode>> joinSpecs) {
+            List<Pair<JoinRelType, RexNode>> joinSpecs,
+            List<Boolean> inputNullGenFieldList) {
         JoinRelType joinType = joinRel.getJoinType();
-        boolean leftCombined = canCombine(left, joinType, joinType.generatesNullsOnLeft());
-        boolean rightCombined = canCombine(right, joinType, joinType.generatesNullsOnRight());
+        JoinInfo joinInfo = joinRel.analyzeCondition();
+        ImmutableIntList leftKeys = joinInfo.leftKeys;
+        ImmutableIntList rightKeys = joinInfo.rightKeys;
+        boolean leftCombined =
+                canCombine(
+                        left,
+                        leftKeys,
+                        joinType,
+                        joinType.generatesNullsOnLeft(),
+                        true,
+                        inputNullGenFieldList,
+                        0);
+        boolean rightCombined =
+                canCombine(
+                        right,
+                        rightKeys,
+                        joinType,
+                        joinType.generatesNullsOnRight(),
+                        false,
+                        inputNullGenFieldList,
+                        left.getRowType().getFieldCount());
         switch (joinType) {
             case LEFT:
                 if (leftCombined) {
@@ -348,8 +473,12 @@ public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.C
      * @param right Right input of the join
      * @return combined join filters AND-ed together
      */
-    private List<RexNode> combineJoinFilters(Join join, RelNode left, RelNode right) {
+    private List<RexNode> combineJoinFilters(
+            Join join, RelNode left, RelNode right, List<Boolean> inputNullGenFieldList) {
         JoinRelType joinType = join.getJoinType();
+        JoinInfo joinInfo = join.analyzeCondition();
+        ImmutableIntList leftKeys = joinInfo.leftKeys;
+        ImmutableIntList rightKeys = joinInfo.rightKeys;
 
         // AND the join condition if this isn't a left or right outer join; In those cases, the
         // outer join condition is already tracked separately.
@@ -357,12 +486,26 @@ public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.C
         if ((joinType != JoinRelType.LEFT) && (joinType != JoinRelType.RIGHT)) {
             filters.add(join.getCondition());
         }
-        if (canCombine(left, joinType, joinType.generatesNullsOnLeft())) {
+        if (canCombine(
+                left,
+                leftKeys,
+                joinType,
+                joinType.generatesNullsOnLeft(),
+                true,
+                inputNullGenFieldList,
+                0)) {
             filters.add(((MultiJoin) left).getJoinFilter());
         }
         // Need to adjust the RexInputs of the right child, since those need to shift over to the
         // right.
-        if (canCombine(right, joinType, joinType.generatesNullsOnRight())) {
+        if (canCombine(
+                right,
+                rightKeys,
+                joinType,
+                joinType.generatesNullsOnRight(),
+                false,
+                inputNullGenFieldList,
+                left.getRowType().getFieldCount())) {
             MultiJoin multiJoin = (MultiJoin) right;
             filters.add(shiftRightFilter(join, left, multiJoin, multiJoin.getJoinFilter()));
         }
@@ -378,25 +521,49 @@ public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.C
      * @param nullGenerating true if the input is null generating
      * @return true if the input can be combined into a parent MultiJoin
      */
-    private boolean canCombine(RelNode input, JoinRelType joinType, boolean nullGenerating) {
+    private boolean canCombine(
+            RelNode input,
+            ImmutableIntList joinKeys,
+            JoinRelType joinType,
+            boolean nullGenerating,
+            boolean isLeft,
+            List<Boolean> inputNullGenFieldList,
+            int beginIndex) {
         if (input instanceof MultiJoin) {
             MultiJoin join = (MultiJoin) input;
             if (join.isFullOuterJoin() || nullGenerating) {
                 return false;
             }
-            for (JoinRelType type : join.getJoinTypes()) {
-                if (type == JoinRelType.FULL) {
+
+            if (joinType == JoinRelType.LEFT) {
+                if (!isLeft) {
                     return false;
+                } else {
+                    for (int joinKey : joinKeys) {
+                        if (inputNullGenFieldList.get(joinKey + beginIndex)) {
+                            return false;
+                        }
+                    }
                 }
-                // For left/right outer join, if it not meets this condition, it can be converted to
-                // one multi join set.
-                if (joinType != JoinRelType.INNER
-                        && ((type == JoinRelType.LEFT || type == JoinRelType.RIGHT)
-                                && joinType != type)) {
+            } else if (joinType == JoinRelType.RIGHT) {
+                if (isLeft) {
                     return false;
+                } else {
+                    for (int joinKey : joinKeys) {
+                        if (inputNullGenFieldList.get(joinKey + beginIndex)) {
+                            return false;
+                        }
+                    }
                 }
+            } else if (joinType == JoinRelType.INNER) {
+                for (int joinKey : joinKeys) {
+                    if (inputNullGenFieldList.get(joinKey + beginIndex)) {
+                        return false;
+                    }
+                }
+            } else {
+                return false;
             }
-
             return true;
         } else {
             return false;
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
index dc25d68b66f..5583a50e958 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
@@ -374,19 +374,19 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
-+- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
-   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
-   +- Exchange(distribution=[broadcast])
-      +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])\n:  :- [#2] Exchange(distribution=[hash[a1]])\n:  +- [#3] Exchange(distribution=[hash[a2]])\n+- [#1] Exchange(distribution=[broadcast])\n])
-         :- Exchange(distribution=[broadcast])
-         :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
-         :- Exchange(distribution=[hash[a1]])
-         :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
-         +- Exchange(distribution=[hash[a2]])
-            +- HashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
-               :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
-               +- Exchange(distribution=[broadcast])
-                  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])\n:  :- [#2] Exchange(distribution=[hash[a1]])\n:  +- [#3] Exchange(distribution=[hash[a2]])\n+- [#1] Exchange(distribution=[broadcast])\n])
+   :- Exchange(distribution=[broadcast])
+   :  +- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
+   :     :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :     +- Exchange(distribution=[broadcast])
+   :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   +- Exchange(distribution=[hash[a2]])
+      +- HashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+         :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         +- Exchange(distribution=[broadcast])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
 ]]>
     </Resource>
   </TestCase>
@@ -462,19 +462,20 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
-+- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
-   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
-   +- Exchange(distribution=[broadcast])
-      +- MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[RightOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a4, b4, c4, a3, b3, c3], build=[right])\n:- [#1] Exchange(distribution=[hash[a5]])\n+- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n   :- [#2] Exchange(distribution=[hash[a4]])\n   +- [#3] Exchange(distribution=[broadcast])\n])
-         :- Exchange(distribution=[hash[a5]])
-         :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
-         :- Exchange(distribution=[hash[a4]])
-         :  +- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a4, b4, c4], isBroadcast=[true], build=[right])
-         :     :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
-         :     +- Exchange(distribution=[broadcast])
-         :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
-         +- Exchange(distribution=[broadcast])
-            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
++- HashJoin(joinType=[RightOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], build=[right])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+         +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a2, b2, c2, a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
+            :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+            +- Exchange(distribution=[broadcast])
+               +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a4, b4, c4], isBroadcast=[true], build=[right])\n:  :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])\n:  +- [#3] Exchange(distributi [...]
+                  :- Exchange(distribution=[broadcast])
+                  :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+                  :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+                  +- Exchange(distribution=[broadcast])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
index e270c6e7f75..b935a80648e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
@@ -45,7 +45,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
   </TestCase>
   <TestCase name="testFullOuterJoinInnerJoin">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -54,8 +54,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -64,14 +63,13 @@ MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, IN
 :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
 :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testFullOuterJoinLeftOuterJoin">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -80,8 +78,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -90,14 +87,13 @@ MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]],
 :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
 :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testFullOuterJoinRightOuterJoin">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -106,8 +102,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -116,8 +111,7 @@ MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]]
 :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
 :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -268,9 +262,9 @@ MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, IN
     <Resource name="sql">
       <![CDATA[
 SELECT * FROM T1 JOIN T2 ON a = c LEFT OUTER JOIN 
-(SELECT * FROM T3) ON a = e JOIN
-(SELECT * FROM T4) ON a = g LEFT OUTER JOIN
-(SELECT * FROM T5) ON a = i
+ T3 ON a = e JOIN
+ T4 ON a = g LEFT OUTER JOIN
+ T5 ON a = i
         ]]>
     </Resource>
     <Resource name="ast">
@@ -282,12 +276,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i
    :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   :  :  +- LogicalProject(e=[$0], f=[$1])
-   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-   :  +- LogicalProject(g=[$0], h=[$1])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-   +- LogicalProject(i=[$0], j=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -295,12 +286,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i
 MultiJoin(joinFilter=[AND(=($0, $6), =($0, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($0, $4), NULL, =($0, $8)]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-:- LogicalProject(e=[$0], f=[$1])
-:  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-:- LogicalProject(g=[$0], h=[$1])
-:  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-+- LogicalProject(i=[$0], j=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -332,9 +320,9 @@ MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]]
     <Resource name="sql">
       <![CDATA[
 SELECT * FROM T1 JOIN T2 ON a = c RIGHT OUTER JOIN 
-(SELECT * FROM T3) ON a = e JOIN
-(SELECT * FROM T4) ON a = g RIGHT OUTER JOIN
-(SELECT * FROM T5) ON a = i
+ T3 ON a = e JOIN
+ T4 ON a = g RIGHT OUTER JOIN
+ T5 ON a = i
         ]]>
     </Resource>
     <Resource name="ast">
@@ -346,27 +334,22 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i
    :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   :  :  +- LogicalProject(e=[$0], f=[$1])
-   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-   :  +- LogicalProject(g=[$0], h=[$1])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-   +- LogicalProject(i=[$0], j=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $8), NULL]], projFields=[[{0, 1, 2, 3, 4, 5, 6, 7}, {0, 1}]])
-:- MultiJoin(joinFilter=[=($0, $6)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $4), NULL, NULL]], projFields=[[ALL, ALL, ALL]])
-:  :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
-:  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
-:  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-:  :- LogicalProject(e=[$0], f=[$1])
+:- MultiJoin(joinFilter=[=($0, $6)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[ALL, ALL]])
+:  :  :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
 :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-:  +- LogicalProject(g=[$0], h=[$1])
-:     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-+- LogicalProject(i=[$0], j=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -436,9 +419,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
     <Resource name="sql">
       <![CDATA[
 SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN 
-(SELECT * FROM T3) ON a = e LEFT OUTER JOIN
-(SELECT * FROM T4) ON a = g JOIN
-(SELECT * FROM T5) ON a = i
+ T3 ON a = e LEFT OUTER JOIN
+ T4 ON a = g JOIN
+ T5 ON a = i
         ]]>
     </Resource>
     <Resource name="ast">
@@ -450,12 +433,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i
    :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   :  :  +- LogicalProject(e=[$0], f=[$1])
-   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-   :  +- LogicalProject(g=[$0], h=[$1])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-   +- LogicalProject(i=[$0], j=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -463,18 +443,15 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i
 MultiJoin(joinFilter=[AND(=($0, $8), =($0, $4))], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, INNER, LEFT, INNER]], outerJoinConditions=[[NULL, =($0, $2), NULL, =($0, $6), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-:- LogicalProject(e=[$0], f=[$1])
-:  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-:- LogicalProject(g=[$0], h=[$1])
-:  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-+- LogicalProject(i=[$0], j=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testLeftOuterJoinLeftOuterJoin">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -483,8 +460,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -492,14 +468,13 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
 MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, LEFT]], outerJoinConditions=[[NULL, =($0, $2), =($0, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testLeftOuterJoinRightOuterJoin">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -508,8 +483,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -518,8 +492,7 @@ MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]]
 :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]])
 :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -548,12 +521,47 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
    +- LogicalProject(e=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiLeftOuterJoinWithAllKeyInLeft">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1 LEFT OUTER JOIN 
+T2 ON a = c LEFT OUTER JOIN 
+T3 ON a = e LEFT OUTER JOIN
+T4 ON a = g LEFT OUTER JOIN
+T5 ON a = i
+        ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+   :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, LEFT, LEFT, LEFT]], outerJoinConditions=[[NULL, =($0, $2), =($0, $4), =($0, $6), =($0, $8)]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testRightOuterJoinRightOuterJoin">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -562,8 +570,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -572,14 +579,13 @@ MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]]
 :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
 :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testLeftOuterJoinInnerJoin">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -588,8 +594,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -597,8 +602,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
 MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, INNER]], outerJoinConditions=[[NULL, =($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -606,9 +610,9 @@ MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, LE
     <Resource name="sql">
       <![CDATA[
 SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN
-(SELECT * FROM T3) ON a = e RIGHT OUTER JOIN
-(SELECT * FROM T4) ON a = g JOIN
-(SELECT * FROM T5) ON a = i
+ T3 ON a = e RIGHT OUTER JOIN
+ T4 ON a = g JOIN
+ T5 ON a = i
         ]]>
     </Resource>
     <Resource name="ast">
@@ -620,32 +624,52 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i
    :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   :  :  +- LogicalProject(e=[$0], f=[$1])
-   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-   :  +- LogicalProject(g=[$0], h=[$1])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-   +- LogicalProject(i=[$0], j=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-MultiJoin(joinFilter=[=($0, $8)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $6), NULL, NULL]], projFields=[[{0, 1, 2, 3, 4, 5}, {0, 1}, {0, 1}]])
-:- MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $2), NULL, NULL]], projFields=[[ALL, ALL, ALL]])
-:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
-:  :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-:  +- LogicalProject(e=[$0], f=[$1])
-:     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
-:- LogicalProject(g=[$0], h=[$1])
+MultiJoin(joinFilter=[=($0, $8)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3, 4, 5, 6, 7}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $6), NULL]], projFields=[[ALL, ALL]])
+:  :- MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :  :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+:  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
-+- LogicalProject(i=[$0], j=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRightOuterJoinLeftOuterJoin">
+  <TestCase name="testRightOuterJoinInnerJoinWithKeyInLeft">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN T3 ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinLeftOuterJoinWithKeyInLeft">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON a = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -654,8 +678,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -664,8 +687,53 @@ MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]],
 :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
 :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinInnerJoinWithKeyInRight">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN T3 ON c = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($2, $4)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[=($2, $4)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $2), NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinLeftOuterJoinWithKeyInRight">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON c = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($2, $4)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, LEFT]], outerJoinConditions=[[=($0, $2), NULL, =($2, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -697,39 +765,39 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRightOuterJoinInnerJoin">
+  <TestCase name="testSubRightOuterJoinQueryWithKeyInLeft">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+      <![CDATA[SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.a = T3.e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
-+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
-   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
-   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
-   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+LogicalProject(e=[$0], f=[$1], a=[$2], b=[$3], c=[$4], d=[$5])
++- LogicalJoin(condition=[=($2, $0)], joinType=[right])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+      +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+         :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $2), NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
-:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
-:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
-+- LogicalProject(e=[$0], f=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($2, $0), NULL]], projFields=[[{0, 1}, {0, 1, 2, 3}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
++- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSubRightOuterJoinQuery">
+  <TestCase name="testSubRightOuterJoinQueryWithKeyInRight">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.a = T3.e]]>
+      <![CDATA[SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.c = T3.e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(e=[$0], f=[$1], a=[$2], b=[$3], c=[$4], d=[$5])
-+- LogicalJoin(condition=[=($2, $0)], joinType=[right])
++- LogicalJoin(condition=[=($4, $0)], joinType=[right])
    :- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
       +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
@@ -739,7 +807,7 @@ LogicalProject(e=[$0], f=[$1], a=[$2], b=[$3], c=[$4], d=[$5])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, RIGHT, INNER]], outerJoinConditions=[[=($2, $0), =($2, $4), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, RIGHT, INNER]], outerJoinConditions=[[=($4, $0), =($2, $4), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
 :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
index 432e40ad989..fcde31f6247 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
@@ -73,7 +73,7 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
   def testLeftOuterJoinLeftOuterJoin(): Unit = {
     // Can translate join to multi join.
     val sqlQuery =
-      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON a = e"
     util.verifyRelPlan(sqlQuery)
   }
 
@@ -81,44 +81,67 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
   def testLeftOuterJoinRightOuterJoin(): Unit = {
     // Cannot translate join to multi join.
     val sqlQuery =
-      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN T3 ON a = e"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
   def testLeftOuterJoinInnerJoin(): Unit = {
     val sqlQuery =
-      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN T3 ON a = e"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
   def testRightOuterJoinRightOuterJoin(): Unit = {
     val sqlQuery =
-      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN T3 ON a = e"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
-  def testSubRightOuterJoinQuery(): Unit = {
-    // This case will be set into one multi join set.
+  def testSubRightOuterJoinQueryWithKeyInLeft(): Unit = {
+    // This case will not be set into one multi join set because T1.a is a
+    // null generate column after T1 right outer join T2.
     val sqlQuery =
       "SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.a = T3.e"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
-  def testRightOuterJoinLeftOuterJoin(): Unit = {
-    // Cannot not translate join to multi join because right outer join in join left.
+  def testSubRightOuterJoinQueryWithKeyInRight(): Unit = {
+    // This case can be set into one multi join set because T2.c is not a
+    // null generate column after T1 right outer join T2.
     val sqlQuery =
-      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.c = T3.e"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
-  def testRightOuterJoinInnerJoin(): Unit = {
+  def testRightOuterJoinLeftOuterJoinWithKeyInLeft(): Unit = {
     val sqlQuery =
-      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinLeftOuterJoinWithKeyInRight(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON c = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinInnerJoinWithKeyInLeft(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN T3 ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinInnerJoinWithKeyInRight(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN T3 ON c = e"
     util.verifyRelPlan(sqlQuery)
   }
 
@@ -132,21 +155,21 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
   @Test
   def testFullOuterJoinInnerJoin(): Unit = {
     val sqlQuery =
-      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN T3 ON a = e"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
   def testFullOuterJoinLeftOuterJoin(): Unit = {
     val sqlQuery =
-      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN T3 ON a = e"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
   def testFullOuterJoinRightOuterJoin(): Unit = {
     val sqlQuery =
-      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN T3 ON a = e"
     util.verifyRelPlan(sqlQuery)
   }
 
@@ -216,9 +239,9 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
     val sqlQuery =
       """
         |SELECT * FROM T1 JOIN T2 ON a = c LEFT OUTER JOIN 
-        |(SELECT * FROM T3) ON a = e JOIN
-        |(SELECT * FROM T4) ON a = g LEFT OUTER JOIN
-        |(SELECT * FROM T5) ON a = i
+        | T3 ON a = e JOIN
+        | T4 ON a = g LEFT OUTER JOIN
+        | T5 ON a = i
         """.stripMargin
     util.verifyRelPlan(sqlQuery)
 
@@ -232,9 +255,9 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
     val sqlQuery =
       """
         |SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN 
-        |(SELECT * FROM T3) ON a = e LEFT OUTER JOIN
-        |(SELECT * FROM T4) ON a = g JOIN
-        |(SELECT * FROM T5) ON a = i
+        | T3 ON a = e LEFT OUTER JOIN
+        | T4 ON a = g JOIN
+        | T5 ON a = i
         """.stripMargin
     util.verifyRelPlan(sqlQuery)
   }
@@ -247,9 +270,9 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
     val sqlQuery =
       """
         |SELECT * FROM T1 JOIN T2 ON a = c RIGHT OUTER JOIN 
-        |(SELECT * FROM T3) ON a = e JOIN
-        |(SELECT * FROM T4) ON a = g RIGHT OUTER JOIN
-        |(SELECT * FROM T5) ON a = i
+        | T3 ON a = e JOIN
+        | T4 ON a = g RIGHT OUTER JOIN
+        | T5 ON a = i
         """.stripMargin
     util.verifyRelPlan(sqlQuery)
   }
@@ -262,9 +285,25 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
     val sqlQuery =
       """
         |SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN
-        |(SELECT * FROM T3) ON a = e RIGHT OUTER JOIN
-        |(SELECT * FROM T4) ON a = g JOIN
-        |(SELECT * FROM T5) ON a = i
+        | T3 ON a = e RIGHT OUTER JOIN
+        | T4 ON a = g JOIN
+        | T5 ON a = i
+        """.stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiLeftOuterJoinWithAllKeyInLeft: Unit = {
+    util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+    util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM T1 LEFT OUTER JOIN 
+        |T2 ON a = c LEFT OUTER JOIN 
+        |T3 ON a = e LEFT OUTER JOIN
+        |T4 ON a = g LEFT OUTER JOIN
+        |T5 ON a = i
         """.stripMargin
     util.verifyRelPlan(sqlQuery)
   }