You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/08 10:14:38 UTC

[flink] branch master updated: [FLINK-28599][table-planner] Adding FlinkJoinToMultiJoinRule to support translating left/right outer join to multi join

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6886e444c2b [FLINK-28599][table-planner] Adding FlinkJoinToMultiJoinRule to support translating left/right outer join to multi join
6886e444c2b is described below

commit 6886e444c2be97984aa95606120080b8db532feb
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Tue Jul 19 11:07:56 2022 +0800

    [FLINK-28599][table-planner] Adding FlinkJoinToMultiJoinRule to support translating left/right outer join to multi join
    
    This closes #20303
---
 .../rules/logical/FlinkJoinToMultiJoinRule.java    | 555 +++++++++++++++++
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   2 +-
 .../plan/batch/sql/join/JoinReorderTest.xml        | 307 +++++++++-
 .../rules/logical/FlinkJoinToMultiJoinRuleTest.xml | 672 ++++++++++++++++++++-
 .../plan/rules/logical/JoinToMultiJoinRuleTest.xml |  56 ++
 .../plan/stream/sql/join/JoinReorderTest.xml       | 291 +++++++++
 .../planner/plan/common/JoinReorderTestBase.scala  |  88 ++-
 .../logical/FlinkJoinToMultiJoinRuleTest.scala     | 211 ++++++-
 .../runtime/batch/sql/join/JoinITCase.scala        | 211 +++++++
 9 files changed, 2362 insertions(+), 31 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java
new file mode 100644
index 00000000000..838062dc15c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.rules.FilterMultiJoinMergeRule;
+import org.apache.calcite.rel.rules.MultiJoin;
+import org.apache.calcite.rel.rules.ProjectMultiJoinMergeRule;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink Planner rule to flatten a tree of {@link LogicalJoin}s into a single {@link MultiJoin} with
+ * N inputs.
+ *
+ * <p>This rule is copied from {@link org.apache.calcite.rel.rules.JoinToMultiJoinRule}. In this
+ * rule, we support richer join type to convert to one multi join set, like left outer join and
+ * right outer join, by rewrite $canCombine() method.
+ *
+ * <p>An input is not flattened if the input is a null generating input in an outer join, i.e.,
+ * either input in a full outer join, semi join, anti join, the right side of a left outer join, or
+ * the lef side of a right outer join.
+ *
+ * <p>Join conditions are also pulled up from the inputs into the topmost {@link MultiJoin}.
+ *
+ * <p>Outer join information is also stored in the {@link MultiJoin}. A boolean flag indicates if
+ * the join is a full outer join, and in the case of left and right outer joins, the join type and
+ * outer join conditions are stored in arrays in the {@link MultiJoin}. This outer join information
+ * is associated with the null generating input in the outer join. So, in the case of a left outer
+ * join between A and B, the information is associated with B, not A.
+ *
+ * <p>Here are examples of the {@link MultiJoin}s constructed after this rule has been applied on
+ * following join trees.
+ *
+ * <ul>
+ *   <li>A JOIN B &rarr; MJ(A, B)
+ *   <li>A JOIN B JOIN C &rarr; MJ(A, B, C)
+ *   <li>A LEFT JOIN B &rarr; MJ(A, B)
+ *   <li>A RIGHT JOIN B &rarr; MJ(A, B)
+ *   <li>A FULL JOIN B &rarr; MJ[full](A, B)
+ *   <li>A LEFT JOIN (B JOIN C) &rarr; MJ(A, B, C)
+ *   <li>(A JOIN B) LEFT JOIN C &rarr; MJ(A, B, C)
+ *   <li>(A LEFT JOIN B) JOIN C &rarr; MJ(A, B, C)
+ *   <li>(A LEFT JOIN B) LEFT JOIN C &rarr; MJ(A, B, C)
+ *   <li>(A RIGHT JOIN B) RIGHT JOIN C &rarr; MJ(MJ(A, B), C)
+ *   <li>(A LEFT JOIN B) RIGHT JOIN C &rarr; MJ(MJ(A, B), C)
+ *   <li>(A RIGHT JOIN B) LEFT JOIN C &rarr; MJ(MJ(A, B), C)
+ *   <li>A LEFT JOIN (B FULL JOIN C) &rarr; MJ(A, MJ[full](B, C))
+ *   <li>(A LEFT JOIN B) FULL JOIN (C RIGHT JOIN D) &rarr; MJ[full](MJ(A, B), MJ(C, D))
+ *   <li>SEMI JOIN and ANTI JOIN not support now.
+ * </ul>
+ *
+ * <p>The constructor is parameterized to allow any sub-class of {@link Join}, not just {@link
+ * LogicalJoin}.
+ *
+ * @see FilterMultiJoinMergeRule
+ * @see ProjectMultiJoinMergeRule
+ * @see CoreRules#JOIN_TO_MULTI_JOIN
+ */
+public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.Config>
+        implements TransformationRule {
+
+    public static final FlinkJoinToMultiJoinRule INSTANCE =
+            FlinkJoinToMultiJoinRule.Config.DEFAULT.toRule();
+
+    /** Creates a JoinToMultiJoinRule. */
+    public FlinkJoinToMultiJoinRule(Config config) {
+        super(config);
+    }
+
+    @Deprecated // to be removed before 2.0
+    public FlinkJoinToMultiJoinRule(Class<? extends Join> clazz) {
+        this(Config.DEFAULT.withOperandFor(clazz));
+    }
+
+    @Deprecated // to be removed before 2.0
+    public FlinkJoinToMultiJoinRule(
+            Class<? extends Join> joinClass, RelBuilderFactory relBuilderFactory) {
+        this(
+                Config.DEFAULT
+                        .withRelBuilderFactory(relBuilderFactory)
+                        .as(Config.class)
+                        .withOperandFor(joinClass));
+    }
+
+    // ~ Methods ----------------------------------------------------------------
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        final Join origJoin = call.rel(0);
+        return origJoin.getJoinType().projectsRight();
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        final Join origJoin = call.rel(0);
+        final RelNode left = call.rel(1);
+        final RelNode right = call.rel(2);
+
+        // Combine the children MultiJoin inputs into an array of inputs for the new MultiJoin.
+        final List<ImmutableBitSet> projFieldsList = new ArrayList<>();
+        final List<int[]> joinFieldRefCountsList = new ArrayList<>();
+        final List<RelNode> newInputs =
+                combineInputs(origJoin, left, right, projFieldsList, joinFieldRefCountsList);
+
+        // Combine the outer join information from the left and right inputs, and include the outer
+        // join information from the current join, if it's a left/right outer join.
+        final List<Pair<JoinRelType, RexNode>> joinSpecs = new ArrayList<>();
+        combineOuterJoins(origJoin, newInputs, left, right, joinSpecs);
+
+        // Pull up the join filters from the children MultiJoinRels and combine them with the join
+        // filter associated with this LogicalJoin to form the join filter for the new MultiJoin.
+        List<RexNode> newJoinFilters = combineJoinFilters(origJoin, left, right);
+
+        // Add on the join field reference counts for the join condition associated with this
+        // LogicalJoin.
+        final com.google.common.collect.ImmutableMap<Integer, ImmutableIntList>
+                newJoinFieldRefCountsMap =
+                        addOnJoinFieldRefCounts(
+                                newInputs,
+                                origJoin.getRowType().getFieldCount(),
+                                origJoin.getCondition(),
+                                joinFieldRefCountsList);
+
+        List<RexNode> newPostJoinFilters = combinePostJoinFilters(origJoin, left, right);
+
+        final RexBuilder rexBuilder = origJoin.getCluster().getRexBuilder();
+        RelNode multiJoin =
+                new MultiJoin(
+                        origJoin.getCluster(),
+                        newInputs,
+                        RexUtil.composeConjunction(rexBuilder, newJoinFilters),
+                        origJoin.getRowType(),
+                        origJoin.getJoinType() == JoinRelType.FULL,
+                        Pair.right(joinSpecs),
+                        Pair.left(joinSpecs),
+                        projFieldsList,
+                        newJoinFieldRefCountsMap,
+                        RexUtil.composeConjunction(rexBuilder, newPostJoinFilters, true));
+
+        call.transformTo(multiJoin);
+    }
+
+    /**
+     * Combines the inputs into a LogicalJoin into an array of inputs.
+     *
+     * @param join original join
+     * @param left left input into join
+     * @param right right input into join
+     * @param projFieldsList returns a list of the new combined projection fields
+     * @param joinFieldRefCountsList returns a list of the new combined join field reference counts
+     * @return combined left and right inputs in an array
+     */
+    private List<RelNode> combineInputs(
+            Join join,
+            RelNode left,
+            RelNode right,
+            List<ImmutableBitSet> projFieldsList,
+            List<int[]> joinFieldRefCountsList) {
+        final List<RelNode> newInputs = new ArrayList<>();
+
+        // Leave the null generating sides of an outer join intact; don't pull up those children
+        // inputs into the array we're constructing.
+        if (canCombine(left, join.getJoinType(), join.getJoinType().generatesNullsOnLeft())) {
+            final MultiJoin leftMultiJoin = (MultiJoin) left;
+            for (int i = 0; i < left.getInputs().size(); i++) {
+                newInputs.add(leftMultiJoin.getInput(i));
+                projFieldsList.add(leftMultiJoin.getProjFields().get(i));
+                joinFieldRefCountsList.add(
+                        leftMultiJoin.getJoinFieldRefCountsMap().get(i).toIntArray());
+            }
+        } else {
+            newInputs.add(left);
+            projFieldsList.add(null);
+            joinFieldRefCountsList.add(new int[left.getRowType().getFieldCount()]);
+        }
+
+        if (canCombine(right, join.getJoinType(), join.getJoinType().generatesNullsOnRight())) {
+            final MultiJoin rightMultiJoin = (MultiJoin) right;
+            for (int i = 0; i < right.getInputs().size(); i++) {
+                newInputs.add(rightMultiJoin.getInput(i));
+                projFieldsList.add(rightMultiJoin.getProjFields().get(i));
+                joinFieldRefCountsList.add(
+                        rightMultiJoin.getJoinFieldRefCountsMap().get(i).toIntArray());
+            }
+        } else {
+            newInputs.add(right);
+            projFieldsList.add(null);
+            joinFieldRefCountsList.add(new int[right.getRowType().getFieldCount()]);
+        }
+
+        return newInputs;
+    }
+
+    /**
+     * Combines the outer join conditions and join types from the left and right join inputs. If the
+     * join itself is either a left or right outer join, then the join condition corresponding to
+     * the join is also set in the position corresponding to the null-generating input into the
+     * join. The join type is also set.
+     *
+     * @param joinRel join rel
+     * @param combinedInputs the combined inputs to the join
+     * @param left left child of the joinrel
+     * @param right right child of the joinrel
+     * @param joinSpecs the list where the join types and conditions will be copied
+     */
+    private void combineOuterJoins(
+            Join joinRel,
+            List<RelNode> combinedInputs,
+            RelNode left,
+            RelNode right,
+            List<Pair<JoinRelType, RexNode>> joinSpecs) {
+        JoinRelType joinType = joinRel.getJoinType();
+        boolean leftCombined = canCombine(left, joinType, joinType.generatesNullsOnLeft());
+        boolean rightCombined = canCombine(right, joinType, joinType.generatesNullsOnRight());
+        switch (joinType) {
+            case LEFT:
+                if (leftCombined) {
+                    copyOuterJoinInfo((MultiJoin) left, joinSpecs, 0, null, null);
+                } else {
+                    joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+                }
+                joinSpecs.add(Pair.of(joinType, joinRel.getCondition()));
+                break;
+            case RIGHT:
+                joinSpecs.add(Pair.of(joinType, joinRel.getCondition()));
+                if (rightCombined) {
+                    copyOuterJoinInfo(
+                            (MultiJoin) right,
+                            joinSpecs,
+                            left.getRowType().getFieldCount(),
+                            right.getRowType().getFieldList(),
+                            joinRel.getRowType().getFieldList());
+                } else {
+                    joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+                }
+                break;
+            default:
+                if (leftCombined) {
+                    copyOuterJoinInfo((MultiJoin) left, joinSpecs, 0, null, null);
+                } else {
+                    joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+                }
+                if (rightCombined) {
+                    copyOuterJoinInfo(
+                            (MultiJoin) right,
+                            joinSpecs,
+                            left.getRowType().getFieldCount(),
+                            right.getRowType().getFieldList(),
+                            joinRel.getRowType().getFieldList());
+                } else {
+                    joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+                }
+        }
+    }
+
+    /**
+     * Copies outer join data from a source MultiJoin to a new set of arrays. Also adjusts the
+     * conditions to reflect the new position of an input if that input ends up being shifted to the
+     * right.
+     *
+     * @param multiJoin the source MultiJoin
+     * @param destJoinSpecs the list where the join types and conditions will be copied
+     * @param adjustmentAmount if &gt; 0, the amount the RexInputRefs in the join conditions need to
+     *     be adjusted by
+     * @param srcFields the source fields that the original join conditions are referencing
+     * @param destFields the destination fields that the new join conditions
+     */
+    private void copyOuterJoinInfo(
+            MultiJoin multiJoin,
+            List<Pair<JoinRelType, RexNode>> destJoinSpecs,
+            int adjustmentAmount,
+            List<RelDataTypeField> srcFields,
+            List<RelDataTypeField> destFields) {
+        final List<Pair<JoinRelType, RexNode>> srcJoinSpecs =
+                Pair.zip(multiJoin.getJoinTypes(), multiJoin.getOuterJoinConditions());
+
+        if (adjustmentAmount == 0) {
+            destJoinSpecs.addAll(srcJoinSpecs);
+        } else {
+            assert srcFields != null;
+            assert destFields != null;
+            int nFields = srcFields.size();
+            int[] adjustments = new int[nFields];
+            for (int idx = 0; idx < nFields; idx++) {
+                adjustments[idx] = adjustmentAmount;
+            }
+            for (Pair<JoinRelType, RexNode> src : srcJoinSpecs) {
+                destJoinSpecs.add(
+                        Pair.of(
+                                src.left,
+                                src.right == null
+                                        ? null
+                                        : src.right.accept(
+                                                new RelOptUtil.RexInputConverter(
+                                                        multiJoin.getCluster().getRexBuilder(),
+                                                        srcFields,
+                                                        destFields,
+                                                        adjustments))));
+            }
+        }
+    }
+
+    /**
+     * Combines the join filters from the left and right inputs (if they are MultiJoinRels) with the
+     * join filter in the joinrel into a single AND'd join filter, unless the inputs correspond to
+     * null generating inputs in an outer join.
+     *
+     * @param join Join
+     * @param left Left input of the join
+     * @param right Right input of the join
+     * @return combined join filters AND-ed together
+     */
+    private List<RexNode> combineJoinFilters(Join join, RelNode left, RelNode right) {
+        JoinRelType joinType = join.getJoinType();
+
+        // AND the join condition if this isn't a left or right outer join; In those cases, the
+        // outer join condition is already tracked separately.
+        final List<RexNode> filters = new ArrayList<>();
+        if ((joinType != JoinRelType.LEFT) && (joinType != JoinRelType.RIGHT)) {
+            filters.add(join.getCondition());
+        }
+        if (canCombine(left, joinType, joinType.generatesNullsOnLeft())) {
+            filters.add(((MultiJoin) left).getJoinFilter());
+        }
+        // Need to adjust the RexInputs of the right child, since those need to shift over to the
+        // right.
+        if (canCombine(right, joinType, joinType.generatesNullsOnRight())) {
+            MultiJoin multiJoin = (MultiJoin) right;
+            filters.add(shiftRightFilter(join, left, multiJoin, multiJoin.getJoinFilter()));
+        }
+
+        return filters;
+    }
+
+    /**
+     * Returns whether an input can be merged into a given relational expression without changing
+     * semantics.
+     *
+     * @param input input into a join
+     * @param nullGenerating true if the input is null generating
+     * @return true if the input can be combined into a parent MultiJoin
+     */
+    private boolean canCombine(RelNode input, JoinRelType joinType, boolean nullGenerating) {
+        if (input instanceof MultiJoin) {
+            MultiJoin join = (MultiJoin) input;
+            if (join.isFullOuterJoin() || nullGenerating) {
+                return false;
+            }
+            for (JoinRelType type : join.getJoinTypes()) {
+                if (type == JoinRelType.FULL) {
+                    return false;
+                }
+                // For left/right outer join, if it not meets this condition, it can be converted to
+                // one multi join set.
+                if (joinType != JoinRelType.INNER
+                        && ((type == JoinRelType.LEFT || type == JoinRelType.RIGHT)
+                                && joinType != type)) {
+                    return false;
+                }
+            }
+
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Shifts a filter originating from the right child of the LogicalJoin to the right, to reflect
+     * the filter now being applied on the resulting MultiJoin.
+     *
+     * @param joinRel the original LogicalJoin
+     * @param left the left child of the LogicalJoin
+     * @param right the right child of the LogicalJoin
+     * @param rightFilter the filter originating from the right child
+     * @return the adjusted right filter
+     */
+    private RexNode shiftRightFilter(
+            Join joinRel, RelNode left, MultiJoin right, RexNode rightFilter) {
+        if (rightFilter == null) {
+            return null;
+        }
+
+        int nFieldsOnLeft = left.getRowType().getFieldList().size();
+        int nFieldsOnRight = right.getRowType().getFieldList().size();
+        int[] adjustments = new int[nFieldsOnRight];
+        for (int i = 0; i < nFieldsOnRight; i++) {
+            adjustments[i] = nFieldsOnLeft;
+        }
+        rightFilter =
+                rightFilter.accept(
+                        new RelOptUtil.RexInputConverter(
+                                joinRel.getCluster().getRexBuilder(),
+                                right.getRowType().getFieldList(),
+                                joinRel.getRowType().getFieldList(),
+                                adjustments));
+        return rightFilter;
+    }
+
+    /**
+     * Adds on to the existing join condition reference counts the references from the new join
+     * condition.
+     *
+     * @param multiJoinInputs inputs into the new MultiJoin
+     * @param nTotalFields total number of fields in the MultiJoin
+     * @param joinCondition the new join condition
+     * @param origJoinFieldRefCounts existing join condition reference counts
+     * @return Map containing the new join condition
+     */
+    private com.google.common.collect.ImmutableMap<Integer, ImmutableIntList>
+            addOnJoinFieldRefCounts(
+                    List<RelNode> multiJoinInputs,
+                    int nTotalFields,
+                    RexNode joinCondition,
+                    List<int[]> origJoinFieldRefCounts) {
+        // count the input references in the join condition
+        int[] joinCondRefCounts = new int[nTotalFields];
+        joinCondition.accept(new InputReferenceCounter(joinCondRefCounts));
+
+        // first, make a copy of the ref counters
+        final Map<Integer, int[]> refCountsMap = new HashMap<>();
+        int nInputs = multiJoinInputs.size();
+        int currInput = 0;
+        for (int[] origRefCounts : origJoinFieldRefCounts) {
+            refCountsMap.put(currInput, origRefCounts.clone());
+            currInput++;
+        }
+
+        // add on to the counts for each input into the MultiJoin the
+        // reference counts computed for the current join condition
+        currInput = -1;
+        int startField = 0;
+        int nFields = 0;
+        for (int i = 0; i < nTotalFields; i++) {
+            if (joinCondRefCounts[i] == 0) {
+                continue;
+            }
+            while (i >= (startField + nFields)) {
+                startField += nFields;
+                currInput++;
+                assert currInput < nInputs;
+                nFields = multiJoinInputs.get(currInput).getRowType().getFieldCount();
+            }
+            int[] refCounts = refCountsMap.get(currInput);
+            refCounts[i - startField] += joinCondRefCounts[i];
+        }
+
+        final com.google.common.collect.ImmutableMap.Builder<Integer, ImmutableIntList> builder =
+                com.google.common.collect.ImmutableMap.builder();
+        for (Map.Entry<Integer, int[]> entry : refCountsMap.entrySet()) {
+            builder.put(entry.getKey(), ImmutableIntList.of(entry.getValue()));
+        }
+        return builder.build();
+    }
+
+    /**
+     * Combines the post-join filters from the left and right inputs (if they are MultiJoinRels)
+     * into a single AND'd filter.
+     *
+     * @param joinRel the original LogicalJoin
+     * @param left left child of the LogicalJoin
+     * @param right right child of the LogicalJoin
+     * @return combined post-join filters AND'd together
+     */
+    private List<RexNode> combinePostJoinFilters(Join joinRel, RelNode left, RelNode right) {
+        final List<RexNode> filters = new ArrayList<>();
+        if (right instanceof MultiJoin) {
+            final MultiJoin multiRight = (MultiJoin) right;
+            filters.add(
+                    shiftRightFilter(joinRel, left, multiRight, multiRight.getPostJoinFilter()));
+        }
+
+        if (left instanceof MultiJoin) {
+            filters.add(((MultiJoin) left).getPostJoinFilter());
+        }
+
+        return filters;
+    }
+
+    // ~ Inner Classes ----------------------------------------------------------
+
+    /** Visitor that keeps a reference count of the inputs used by an expression. */
+    private static class InputReferenceCounter extends RexVisitorImpl<Void> {
+        private final int[] refCounts;
+
+        InputReferenceCounter(int[] refCounts) {
+            super(true);
+            this.refCounts = refCounts;
+        }
+
+        public Void visitInputRef(RexInputRef inputRef) {
+            refCounts[inputRef.getIndex()]++;
+            return null;
+        }
+    }
+
+    /** Rule configuration. */
+    public interface Config extends RelRule.Config {
+        Config DEFAULT = EMPTY.as(Config.class).withOperandFor(LogicalJoin.class);
+
+        @Override
+        default FlinkJoinToMultiJoinRule toRule() {
+            return new FlinkJoinToMultiJoinRule(this);
+        }
+
+        /** Defines an operand tree for the given classes. */
+        default Config withOperandFor(Class<? extends Join> joinClass) {
+            return withOperandSupplier(
+                            b0 ->
+                                    b0.operand(joinClass)
+                                            .inputs(
+                                                    b1 -> b1.operand(RelNode.class).anyInputs(),
+                                                    b2 -> b2.operand(RelNode.class).anyInputs()))
+                    .as(Config.class);
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index e8e0064741c..6513355e61b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -214,7 +214,7 @@ object FlinkBatchRuleSets {
 
   val JOIN_REORDER_PREPARE_RULES: RuleSet = RuleSets.ofList(
     // merge join to MultiJoin
-    CoreRules.JOIN_TO_MULTI_JOIN,
+    FlinkJoinToMultiJoinRule.INSTANCE,
     // merge project to MultiJoin
     CoreRules.PROJECT_MULTI_JOIN_MERGE,
     // merge filter to MultiJoin
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
index 464efd2bd13..dc25d68b66f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
@@ -87,18 +87,19 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-HashJoin(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right])
-:- Exchange(distribution=[hash[a4]])
-:  +- MultipleInput(readOrder=[0,0,1,0], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[LeftOuterJoin], where=[(a2 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])\n:  :- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])\n:  :  :- [#3] LegacyTableS [...]
-:     :- Exchange(distribution=[broadcast])
-:     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
-:     :- Exchange(distribution=[broadcast])
-:     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
-:     :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
-:     +- Exchange(distribution=[broadcast])
-:        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
-+- Exchange(distribution=[hash[a5]])
-   +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a1, b1, c1, a2, b2, c2, a4, b4, c4, a3, b3, c3, a5, b5, c5], build=[right])\n:- HashJoin(joinType=[LeftOuterJoin], where=[(a2 = a3)], select=[a1, b1, c1, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n:  :- [#2] Exchange(distribution=[hash[a4]])\n:  +- [#3] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[hash[a5]])\n])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :- Exchange(distribution=[hash[a4]])
+   :  +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])\n:  :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])\n:  +- [#3] Exchange(distribution=[b [...]
+   :     :- Exchange(distribution=[broadcast])
+   :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   :     :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :     +- Exchange(distribution=[broadcast])
+   :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   +- Exchange(distribution=[broadcast])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
 ]]>
     </Resource>
   </TestCase>
@@ -373,18 +374,196 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
-+- MultipleInput(readOrder=[0,0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])\n:  :- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])\n:  :  :- [#3] Exchange(distribut [...]
++- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[broadcast])
+      +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])\n:  :- [#2] Exchange(distribution=[hash[a1]])\n:  +- [#3] Exchange(distribution=[hash[a2]])\n+- [#1] Exchange(distribution=[broadcast])\n])
+         :- Exchange(distribution=[broadcast])
+         :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+         :- Exchange(distribution=[hash[a1]])
+         :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         +- Exchange(distribution=[hash[a2]])
+            +- HashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+               :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+               +- Exchange(distribution=[broadcast])
+                  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinAntiJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   WHERE NOT EXISTS (SELECT a5 FROM T5 WHERE a1 = a5)
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a1, $0)])
+  LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+      :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+      :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a2 = a4)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], build=[right])\n:  :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])\n:  +- [#3] Exchange(distribution=[broad [...]
    :- Exchange(distribution=[broadcast])
-   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   +- Exchange(distribution=[broadcast])
+      +- MultipleInput(readOrder=[0,1,0], members=[\nNestedLoopJoin(joinType=[LeftAntiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n:  :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])\n:  +- [#3] Exchange(distribution=[broadcast])\n+- [#1] Exchange(di [...]
+         :- Exchange(distribution=[broadcast])
+         :  +- LocalHashAggregate(groupBy=[a5], select=[a5])
+         :     +- Calc(select=[a5])
+         :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+         :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         +- Exchange(distribution=[broadcast])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   LEFT OUTER JOIN T3 ON a1 = a3
+   JOIN T4 ON a1 = a4
+   LEFT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   +- Exchange(distribution=[broadcast])
+      +- MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[RightOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a4, b4, c4, a3, b3, c3], build=[right])\n:- [#1] Exchange(distribution=[hash[a5]])\n+- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n   :- [#2] Exchange(distribution=[hash[a4]])\n   +- [#3] Exchange(distribution=[broadcast])\n])
+         :- Exchange(distribution=[hash[a5]])
+         :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+         :- Exchange(distribution=[hash[a4]])
+         :  +- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a4, b4, c4], isBroadcast=[true], build=[right])
+         :     :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :     +- Exchange(distribution=[broadcast])
+         :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+         +- Exchange(distribution=[broadcast])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinRightOuterJoinInnerJoinRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   RIGHT OUTER JOIN T3 ON a1 = a3
+   JOIN T4 ON a1 = a4
+   RIGHT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[right])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[broadcast])
+      +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+         +- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+            :- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+            +- Exchange(distribution=[broadcast])
+               +- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[left])
+                  :- Exchange(distribution=[broadcast])
+                  :  +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])
+                  :     :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+                  :     +- Exchange(distribution=[broadcast])
+                  :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+                  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinSemiJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   WHERE a1 IN (SELECT a5 FROM T5)
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a5=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+})])
+   +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+      :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+      :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a2 = a4)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n:  :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])\n:  +- [#3] Exchange(distr [...]
    :- Exchange(distribution=[broadcast])
-   :  +- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
-   :     :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
-   :     +- Exchange(distribution=[broadcast])
-   :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
-   :- Exchange(distribution=[hash[a1]])
-   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
-   +- Exchange(distribution=[hash[a2]])
-      +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   +- Exchange(distribution=[broadcast])
+      +- MultipleInput(readOrder=[0,1,0], members=[\nNestedLoopJoin(joinType=[LeftSemiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n:  :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])\n:  +- [#3] Exchange(distribution=[broadcast])\n+- [#1] Exchange(di [...]
+         :- Exchange(distribution=[broadcast])
+         :  +- LocalHashAggregate(groupBy=[a5], select=[a5])
+         :     +- Calc(select=[a5])
+         :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+         :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         +- Exchange(distribution=[broadcast])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
 ]]>
     </Resource>
   </TestCase>
@@ -484,6 +663,90 @@ Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
          +- Exchange(distribution=[broadcast])
             +- Calc(select=[a2, c2])
                +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   LEFT OUTER JOIN T2 ON a1 = a2
+   JOIN T3 ON a1 = a3
+   LEFT OUTER JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a2, b2, c2, a5, b5, c5, a4, b4, c4, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a4, b4, c4, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n   :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTab [...]
+   :- Exchange(distribution=[broadcast])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[broadcast])
+      +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[RightOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- HashJoin(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n   :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c [...]
+         :- Exchange(distribution=[broadcast])
+         :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+         :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         +- Exchange(distribution=[broadcast])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinInnerJoinRightOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   RIGHT OUTER JOIN T2 ON a1 = a2
+   JOIN T3 ON a1 = a3
+   RIGHT OUTER JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[broadcast])
+      +- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+         :- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+         +- Exchange(distribution=[broadcast])
+            +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])\n:  :- [#2] Exchange(distribution=[hash[a1]])\n:  +- [#3] Exchange(distribution=[hash[a2]])\n+- [#1] Exchange(distribution=[broadcast])\n])
+               :- Exchange(distribution=[broadcast])
+               :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+               :- Exchange(distribution=[hash[a1]])
+               :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+               +- Exchange(distribution=[hash[a2]])
+                  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
index d0aa091f00b..e270c6e7f75 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
@@ -16,7 +16,140 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testDoesNotMatchAntiJoin">
+  <TestCase name="testFullOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c, T3 WHERE a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($0, $4)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($0, $4)])
+   +- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+      :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFullOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFullOuterJoinLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $4)]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFullOuterJoinRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFullOuterJoinSemiJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+})])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[semi])
+   :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinAntiJoin">
     <Resource name="sql">
       <![CDATA[
 SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t
@@ -49,7 +182,195 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDoesNotMatchSemiJoin">
+  <TestCase name="testInnerJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1, T2, T3 WHERE a = c AND a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($0, $4))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($0, $4))])
+   +- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[ALL, ALL, ALL]])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinAntiJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t
+WHERE NOT EXISTS (SELECT e FROM T3  WHERE a = e)
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+  LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+      +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+         :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[anti])
+   :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 JOIN T2 ON a =c LEFT OUTER JOIN T3 ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($0, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1 JOIN T2 ON a = c LEFT OUTER JOIN 
+(SELECT * FROM T3) ON a = e JOIN
+(SELECT * FROM T4) ON a = g LEFT OUTER JOIN
+(SELECT * FROM T5) ON a = i
+        ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+   :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   :  :  +- LogicalProject(e=[$0], f=[$1])
+   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalProject(g=[$0], h=[$1])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalProject(i=[$0], j=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[AND(=($0, $6), =($0, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($0, $4), NULL, =($0, $8)]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:- LogicalProject(e=[$0], f=[$1])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalProject(g=[$0], h=[$1])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 JOIN T2 ON a =c RIGHT OUTER JOIN T3 ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinRightOuterJoinInnerJoinRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1 JOIN T2 ON a = c RIGHT OUTER JOIN 
+(SELECT * FROM T3) ON a = e JOIN
+(SELECT * FROM T4) ON a = g RIGHT OUTER JOIN
+(SELECT * FROM T5) ON a = i
+        ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+   :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   :  :  +- LogicalProject(e=[$0], f=[$1])
+   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalProject(g=[$0], h=[$1])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalProject(i=[$0], j=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $8), NULL]], projFields=[[{0, 1, 2, 3, 4, 5, 6, 7}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $6)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $4), NULL, NULL]], projFields=[[ALL, ALL, ALL]])
+:  :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+:  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:  :- LogicalProject(e=[$0], f=[$1])
+:  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:  +- LogicalProject(g=[$0], h=[$1])
+:     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinSemiJoin">
     <Resource name="sql">
       <![CDATA[SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t WHERE a IN (SELECT e FROM T3)]]>
     </Resource>
@@ -75,6 +396,353 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
    +- LogicalProject(e=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinAntiJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c) t
+WHERE NOT EXISTS (SELECT e FROM T3  WHERE a = e)
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+  LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+      +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+         :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[anti])
+   :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[{0, 1}, {0, 1}]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN 
+(SELECT * FROM T3) ON a = e LEFT OUTER JOIN
+(SELECT * FROM T4) ON a = g JOIN
+(SELECT * FROM T5) ON a = i
+        ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   :  :  +- LogicalProject(e=[$0], f=[$1])
+   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalProject(g=[$0], h=[$1])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalProject(i=[$0], j=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[AND(=($0, $8), =($0, $4))], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, INNER, LEFT, INNER]], outerJoinConditions=[[NULL, =($0, $2), NULL, =($0, $6), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:- LogicalProject(e=[$0], f=[$1])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalProject(g=[$0], h=[$1])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, LEFT]], outerJoinConditions=[[NULL, =($0, $2), =($0, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinSemiJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+})])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[semi])
+   :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, INNER]], outerJoinConditions=[[NULL, =($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinInnerJoinRightOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN
+(SELECT * FROM T3) ON a = e RIGHT OUTER JOIN
+(SELECT * FROM T4) ON a = g JOIN
+(SELECT * FROM T5) ON a = i
+        ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $6)], joinType=[right])
+   :  :- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   :  :  +- LogicalProject(e=[$0], f=[$1])
+   :  :     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   :  +- LogicalProject(g=[$0], h=[$1])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+   +- LogicalProject(i=[$0], j=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $8)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $6), NULL, NULL]], projFields=[[{0, 1, 2, 3, 4, 5}, {0, 1}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $2), NULL, NULL]], projFields=[[ALL, ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:  +- LogicalProject(e=[$0], f=[$1])
+:     +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalProject(g=[$0], h=[$1])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $4)]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+:  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinSemiJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+})])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[semi])
+   :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+   +- LogicalProject(e=[$0], f=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $2), NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSubRightOuterJoinQuery">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.a = T3.e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(e=[$0], f=[$1], a=[$2], b=[$3], c=[$4], d=[$5])
++- LogicalJoin(condition=[=($2, $0)], joinType=[right])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+      +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+         :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, RIGHT, INNER]], outerJoinConditions=[[=($2, $0), =($2, $4), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRuleTest.xml
new file mode 100644
index 00000000000..bd6e95faf3d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRuleTest.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testLeftOuterJoinRightOuterJoinToMultiJoin">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8])
++- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
+   +- LogicalProject(a3=[$0], b3=[$1], c3=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t3]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- Calc(select=[a1, b1, c1, a2, b2, c2])
+:     +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a2, b2, c2, a1, b1, c1], build=[right])
+:        :- Exchange(distribution=[hash[a2]])
+:        :  +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[a2, b2, c2])
+:        +- Exchange(distribution=[hash[a1]])
+:           +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a1, b1, c1])
++- Exchange(distribution=[hash[a3]])
+   +- TableSourceScan(table=[[default_catalog, default_database, t3]], fields=[a3, b3, c3])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- Calc(select=[a1, b1, c1, a2, b2, c2])
+:     +- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a2, b2, c2, a1, b1, c1], build=[right])
+:        :- Exchange(distribution=[hash[a2]])
+:        :  +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[a2, b2, c2])
+:        +- Exchange(distribution=[hash[a1]])
+:           +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a1, b1, c1])
++- Exchange(distribution=[hash[a3]])
+   +- TableSourceScan(table=[[default_catalog, default_database, t3]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml
index 01774ba333c..e59941b5a7c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml
@@ -416,6 +416,201 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
    :              +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
    +- Exchange(distribution=[hash[a3]])
       +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinAntiJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   WHERE NOT EXISTS (SELECT a5 FROM T5 WHERE a1 = a5)
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a1, $0)])
+  LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+      :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+      :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- Join(joinType=[InnerJoin], where=[((a1 = a4) AND (a4 = a2) AND (a3 = a4))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2, a3]])
+   :  +- Join(joinType=[InnerJoin], where=[((a2 = a3) AND (a1 = a2))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[a2, a2]])
+   :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :     +- Exchange(distribution=[hash[a3, a1]])
+   :        +- Join(joinType=[LeftAntiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[a1]])
+   :           :  +- Join(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :     :- Exchange(distribution=[hash[a1]])
+   :           :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :           :     +- Exchange(distribution=[hash[a3]])
+   :           :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   :           +- Exchange(distribution=[hash[a5]])
+   :              +- Calc(select=[a5])
+   :                 +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4, a4, a4]])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   LEFT OUTER JOIN T3 ON a1 = a3
+   JOIN T4 ON a1 = a4
+   LEFT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[RightOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Join(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[a1]])
+         :  +- Join(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :     :- Exchange(distribution=[hash[a1]])
+         :     :  +- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :     :     :- Exchange(distribution=[hash[a1]])
+         :     :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+         :     :     +- Exchange(distribution=[hash[a2]])
+         :     :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         :     +- Exchange(distribution=[hash[a3]])
+         :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+         +- Exchange(distribution=[hash[a4]])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinRightOuterJoinInnerJoinRightOuterJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   RIGHT OUTER JOIN T3 ON a1 = a3
+   JOIN T4 ON a1 = a4
+   RIGHT OUTER JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[right])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+         +- Join(joinType=[InnerJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[a4]])
+            :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+            +- Exchange(distribution=[hash[a1]])
+               +- Join(joinType=[RightOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+                  :- Exchange(distribution=[hash[a1]])
+                  :  +- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+                  :     :- Exchange(distribution=[hash[a1]])
+                  :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+                  :     +- Exchange(distribution=[hash[a2]])
+                  :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+                  +- Exchange(distribution=[hash[a3]])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinSemiJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   JOIN T2 ON a1 = a2
+   JOIN T3 ON a2 = a3
+   JOIN T4 ON a1 = a4
+   WHERE a1 IN (SELECT a5 FROM T5)
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a5=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+})])
+   +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+      :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+      :  :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+      :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- Join(joinType=[InnerJoin], where=[((a1 = a4) AND (a4 = a2) AND (a3 = a4))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2, a3]])
+   :  +- Join(joinType=[InnerJoin], where=[((a2 = a3) AND (a1 = a2))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :     :- Exchange(distribution=[hash[a2, a2]])
+   :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+   :     +- Exchange(distribution=[hash[a3, a1]])
+   :        +- Join(joinType=[LeftSemiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :- Exchange(distribution=[hash[a1]])
+   :           :  +- Join(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :           :     :- Exchange(distribution=[hash[a1]])
+   :           :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+   :           :     +- Exchange(distribution=[hash[a3]])
+   :           :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+   :           +- Exchange(distribution=[hash[a5]])
+   :              +- Calc(select=[a5])
+   :                 +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4, a4, a4]])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
 ]]>
     </Resource>
   </TestCase>
@@ -528,6 +723,102 @@ Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
          +- Exchange(distribution=[hash[a4]])
             +- Calc(select=[a4, b4])
                +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   LEFT OUTER JOIN T2 ON a1 = a2
+   JOIN T3 ON a1 = a3
+   LEFT OUTER JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+         +- Join(joinType=[RightOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[a4]])
+            :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+            +- Exchange(distribution=[hash[a1]])
+               +- Join(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+                  :- Exchange(distribution=[hash[a1]])
+                  :  +- Join(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+                  :     :- Exchange(distribution=[hash[a1]])
+                  :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+                  :     +- Exchange(distribution=[hash[a2]])
+                  :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+                  +- Exchange(distribution=[hash[a3]])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinInnerJoinRightOuterJoinInnerJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1
+   RIGHT OUTER JOIN T2 ON a1 = a2
+   JOIN T3 ON a1 = a3
+   RIGHT OUTER JOIN T4 ON a1 = a4
+   JOIN T5 ON a4 = a5
+         ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+   :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+   :  :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+   :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+   :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+   :  :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+   :  :  +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a5]])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   +- Exchange(distribution=[hash[a4]])
+      +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+         +- Join(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[a4]])
+            :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+            +- Exchange(distribution=[hash[a1]])
+               +- Join(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+                  :- Exchange(distribution=[hash[a1]])
+                  :  +- Join(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+                  :     :- Exchange(distribution=[hash[a1]])
+                  :     :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+                  :     +- Exchange(distribution=[hash[a2]])
+                  :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+                  +- Exchange(distribution=[hash[a3]])
+                     +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
index 692369198e8..ac87bc265ac 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
@@ -210,7 +210,7 @@ abstract class JoinReorderTestBase extends TableTestBase {
          |   LEFT OUTER JOIN T4 ON a1 = a4
          |   JOIN T5 ON a4 = a5
          """.stripMargin
-    // T1, T2, T3 can reorder
+    // T1, T2, T3 T4 T5 can reorder.
     util.verifyExecPlan(sql)
   }
 
@@ -251,7 +251,7 @@ abstract class JoinReorderTestBase extends TableTestBase {
          |   LEFT OUTER JOIN T4 ON a1 = a4
          |   LEFT OUTER JOIN T5 ON a4 = a5
          """.stripMargin
-    // can not reorder
+    // can reorder. Left outer join will be converted to one multi set by FlinkJoinToMultiJoinRule.
     util.verifyExecPlan(sql)
   }
 
@@ -283,6 +283,90 @@ abstract class JoinReorderTestBase extends TableTestBase {
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   JOIN T2 ON a1 = a2
+         |   LEFT OUTER JOIN T3 ON a1 = a3
+         |   JOIN T4 ON a1 = a4
+         |   LEFT OUTER JOIN T5 ON a4 = a5
+         """.stripMargin
+    // T1, T2, T3, T4, T5 can reorder.
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   LEFT OUTER JOIN T2 ON a1 = a2
+         |   JOIN T3 ON a1 = a3
+         |   LEFT OUTER JOIN T4 ON a1 = a4
+         |   JOIN T5 ON a4 = a5
+         """.stripMargin
+    // T1, T2, T3, T4, T5 can reorder.
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testInnerJoinRightOuterJoinInnerJoinRightOuterJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   JOIN T2 ON a1 = a2
+         |   RIGHT OUTER JOIN T3 ON a1 = a3
+         |   JOIN T4 ON a1 = a4
+         |   RIGHT OUTER JOIN T5 ON a4 = a5
+         """.stripMargin
+    // T1 and T2 can not reorder, but MJ(T1, T2), T3, T4 can reorder.
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testRightOuterJoinInnerJoinRightOuterJoinInnerJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   RIGHT OUTER JOIN T2 ON a1 = a2
+         |   JOIN T3 ON a1 = a3
+         |   RIGHT OUTER JOIN T4 ON a1 = a4
+         |   JOIN T5 ON a4 = a5
+         """.stripMargin
+    // T1, T2, T3 can reorder, and MJ(T1, T2, T3), T4, T5 can reorder.
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testInnerJoinSemiJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   JOIN T2 ON a1 = a2
+         |   JOIN T3 ON a2 = a3
+         |   JOIN T4 ON a1 = a4
+         |   WHERE a1 IN (SELECT a5 FROM T5)
+         """.stripMargin
+    // can not reorder. Semi join will support join order in future.
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testInnerJoinAntiJoin(): Unit = {
+    val sql =
+      s"""
+         |SELECT * FROM T1
+         |   JOIN T2 ON a1 = a2
+         |   JOIN T3 ON a2 = a3
+         |   JOIN T4 ON a1 = a4
+         |   WHERE NOT EXISTS (SELECT a5 FROM T5 WHERE a1 = a5)
+         """.stripMargin
+    // can not reorder
+    util.verifyExecPlan(sql)
+  }
+
   @Test
   def testDeriveNullFilterAfterJoinReorder(): Unit = {
     val types = Array[TypeInformation[_]](Types.INT, Types.LONG)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
index e36fe7a5683..432e40ad989 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.rules.CoreRules
 import org.apache.calcite.tools.RuleSets
 import org.junit.{Before, Test}
 
-/** Tests for [[org.apache.calcite.rel.rules.JoinToMultiJoinRule]]. */
+/** Tests for [[org.apache.flink.table.planner.plan.rules.logical.FlinkJoinToMultiJoinRule]]. */
 class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
   private val util = batchTestUtil()
 
@@ -40,7 +40,7 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
       FlinkHepRuleSetProgramBuilder.newBuilder
         .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
         .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
-        .add(RuleSets.ofList(CoreRules.JOIN_TO_MULTI_JOIN, CoreRules.PROJECT_MULTI_JOIN_MERGE))
+        .add(RuleSets.ofList(FlinkJoinToMultiJoinRule.INSTANCE, CoreRules.PROJECT_MULTI_JOIN_MERGE))
         .build()
     )
 
@@ -50,14 +50,136 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
   }
 
   @Test
-  def testDoesNotMatchSemiJoin(): Unit = {
+  def testInnerJoinInnerJoin(): Unit = {
+    // Can translate join to multi join.
+    val sqlQuery = "SELECT * FROM T1, T2, T3 WHERE a = c AND a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testInnerJoinLeftOuterJoin(): Unit = {
+    val sqlQuery = "SELECT * FROM T1 JOIN T2 ON a =c LEFT OUTER JOIN T3 ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testInnerJoinRightOuterJoin(): Unit = {
+    // Cannot translate to one multi join set because right outer join left will generate null.
+    val sqlQuery = "SELECT * FROM T1 JOIN T2 ON a =c RIGHT OUTER JOIN T3 ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testLeftOuterJoinLeftOuterJoin(): Unit = {
+    // Can translate join to multi join.
+    val sqlQuery =
+      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testLeftOuterJoinRightOuterJoin(): Unit = {
+    // Cannot translate join to multi join.
+    val sqlQuery =
+      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testLeftOuterJoinInnerJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinRightOuterJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testSubRightOuterJoinQuery(): Unit = {
+    // This case will be set into one multi join set.
+    val sqlQuery =
+      "SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.a = T3.e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinLeftOuterJoin(): Unit = {
+    // Cannot not translate join to multi join because right outer join in join left.
+    val sqlQuery =
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinInnerJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testFullOuterJoin(): Unit = {
+    // Cannot translate join to multi join.
+    val sqlQuery = "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c, T3 WHERE a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testFullOuterJoinInnerJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testFullOuterJoinLeftOuterJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testFullOuterJoinRightOuterJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testFullOuterJoinSemiJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testInnerJoinSemiJoin(): Unit = {
     val sqlQuery =
       "SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t WHERE a IN (SELECT e FROM T3)"
     util.verifyRelPlan(sqlQuery)
   }
 
   @Test
-  def testDoesNotMatchAntiJoin(): Unit = {
+  def testLeftOuterJoinSemiJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinSemiJoin(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testInnerJoinAntiJoin(): Unit = {
     val sqlQuery =
       """
         |SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t
@@ -65,4 +187,85 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
       """.stripMargin
     util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testLeftOuterJoinAntiJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c) t
+        |WHERE NOT EXISTS (SELECT e FROM T3  WHERE a = e)
+      """.stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinAntiJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t
+        |WHERE NOT EXISTS (SELECT e FROM T3  WHERE a = e)
+      """.stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin(): Unit = {
+    util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+    util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM T1 JOIN T2 ON a = c LEFT OUTER JOIN 
+        |(SELECT * FROM T3) ON a = e JOIN
+        |(SELECT * FROM T4) ON a = g LEFT OUTER JOIN
+        |(SELECT * FROM T5) ON a = i
+        """.stripMargin
+    util.verifyRelPlan(sqlQuery)
+
+  }
+
+  @Test
+  def testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin(): Unit = {
+    util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+    util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN 
+        |(SELECT * FROM T3) ON a = e LEFT OUTER JOIN
+        |(SELECT * FROM T4) ON a = g JOIN
+        |(SELECT * FROM T5) ON a = i
+        """.stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testInnerJoinRightOuterJoinInnerJoinRightOuterJoin(): Unit = {
+    util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+    util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM T1 JOIN T2 ON a = c RIGHT OUTER JOIN 
+        |(SELECT * FROM T3) ON a = e JOIN
+        |(SELECT * FROM T4) ON a = g RIGHT OUTER JOIN
+        |(SELECT * FROM T5) ON a = i
+        """.stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testRightOuterJoinInnerJoinRightOuterJoinInnerJoin(): Unit = {
+    util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+    util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN
+        |(SELECT * FROM T3) ON a = e RIGHT OUTER JOIN
+        |(SELECT * FROM T4) ON a = g JOIN
+        |(SELECT * FROM T5) ON a = i
+        """.stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
index 62a06e99d45..0c954918a29 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
@@ -24,9 +24,12 @@ import org.apache.flink.api.common.typeutils.TypeComparator
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
 import org.apache.flink.streaming.api.transformations.{LegacySinkTransformation, OneInputTransformation, TwoInputTransformation}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.internal.{StatementSetImpl, TableEnvironmentInternal}
+import org.apache.flink.table.plan.stats.TableStats
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.expressions.utils.FuncWithOpen
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -770,6 +773,214 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
     )
   }
 
+  @Test
+  def testLeftOuterJoinReorder(): Unit = {
+    // This test is used to test the result after join to multi join and join reorder.
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+      Boolean.box(true))
+    // Register table with stats to support join reorder,
+    // join order LJ(LJ(LJ(T5, T3), T2), T1) will reorder to RJ(T1, LJ(T2, LJ(T5, T3)))
+    registerCollection(
+      "T5",
+      data5,
+      type5,
+      "d, e, f, g, h",
+      nullablesOfData5,
+      new FlinkStatistic(new TableStats(1000L)))
+    registerCollection(
+      "T3",
+      smallData3,
+      type3,
+      "a, b, c",
+      nullablesOfSmallData3,
+      new FlinkStatistic(new TableStats(100L)))
+    registerCollection(
+      "T2",
+      data2,
+      type2,
+      "d, e, f, g, h",
+      nullablesOfData2,
+      new FlinkStatistic(new TableStats(10L)))
+    registerCollection(
+      "T1",
+      data2,
+      type2,
+      "d, e, f, g, h",
+      nullablesOfData2,
+      new FlinkStatistic(new TableStats(100000L)))
+
+    checkResult(
+      """
+        |SELECT T5.g, T3b.c, T2b.g FROM T5 LEFT OUTER JOIN 
+        |(SELECT * FROM T3 WHERE a > 0 ) T3b ON T3b.b = T5.e LEFT OUTER JOIN 
+        |(SELECT * FROM T2 WHERE d > 0) T2b ON T3b.b = T2b.e LEFT OUTER JOIN
+        |(SELECT * FROM T1) T1b ON T3b.b = T1b.e
+        |""".stripMargin,
+      Seq(
+        row("Hallo", "Hi", "Hallo"),
+        row("Hallo Welt", "Hello world", "Hallo Welt"),
+        row("Hallo Welt", "Hello", "Hallo Welt"),
+        row("Hallo Welt wie gehts?", null, null),
+        row("Hallo Welt wie", null, null),
+        row("ABC", null, null),
+        row("BCD", null, null),
+        row("CDE", null, null),
+        row("DEF", null, null),
+        row("EFG", null, null),
+        row("FGH", null, null),
+        row("GHI", null, null),
+        row("HIJ", null, null),
+        row("IJK", null, null),
+        row("JKL", null, null),
+        row("KLM", null, null)
+      )
+    )
+  }
+
+  @Test
+  def testRightOuterJoinReorder(): Unit = {
+    // This test is used to test the result after join to multi join and join reorder.
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+      Boolean.box(true))
+    // Register table with stats to support join reorder,
+    // join order RJ(J(RJ(t5, t3), T2), T1) will reorder to LJ(RJ(T5, J(T3, T2)), T1)
+    registerCollection(
+      "T5",
+      data5,
+      type5,
+      "d, e, f, g, h",
+      nullablesOfData5,
+      new FlinkStatistic(new TableStats(1000L)))
+    registerCollection(
+      "T3",
+      smallData3,
+      type3,
+      "a, b, c",
+      nullablesOfSmallData3,
+      new FlinkStatistic(new TableStats(100L)))
+    registerCollection(
+      "T2",
+      data2,
+      type2,
+      "d, e, f, g, h",
+      nullablesOfData2,
+      new FlinkStatistic(new TableStats(10L)))
+    registerCollection(
+      "T1",
+      data2,
+      type2,
+      "d, e, f, g, h",
+      nullablesOfData2,
+      new FlinkStatistic(new TableStats(100000L)))
+
+    checkResult(
+      """
+        |SELECT T5.g, T3b.c, T2b.g FROM T5 RIGHT OUTER JOIN 
+        |(SELECT * FROM T3 WHERE T3.a > 0) T3b ON T3b.b = T5.e JOIN
+        |(SELECT * FROM T2 WHERE T2.d > 0) T2b ON T3b.b = T2b.e RIGHT OUTER JOIN
+        |(SELECT * FROM T1) T1b ON T3b.b = T1b.e
+        |""".stripMargin,
+      Seq(
+        row("Hallo Welt", "Hello world", "Hallo Welt"),
+        row("Hallo Welt", "Hello", "Hallo Welt"),
+        row("Hallo", "Hi", "Hallo"),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null),
+        row(null, null, null)
+      )
+    )
+  }
+
+  @Test
+  def testRightOuterJoinRightOuterJoinCannotReorder: Unit = {
+    // This test is used to test the result after join to multi join and join reorder.
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+      Boolean.box(true))
+    registerCollection("Table2", data2, type2, "d, e, f, g, h", nullablesOfData2)
+    // This query will be set into one multi jon set by FlinkJoinToMultiJoinRule,
+    // but it can not reorder, because the sub right outer join query join condition is from generate-null side.
+    checkResult(
+      """
+        |SELECT Table5.g, c, t.g FROM Table5 RIGHT OUTER JOIN
+        |(SELECT * FROM SmallTable3 RIGHT OUTER JOIN Table2 ON b = Table2.e) t ON t.e = Table5.e
+        |""".stripMargin,
+      Seq(
+        row("ABC", null, "ABC"),
+        row("BCD", null, "BCD"),
+        row("CDE", null, "CDE"),
+        row("DEF", null, "DEF"),
+        row("EFG", null, "EFG"),
+        row("FGH", null, "FGH"),
+        row("GHI", null, "GHI"),
+        row("HIJ", null, "HIJ"),
+        row("Hallo Welt wie gehts?", null, "Hallo Welt wie gehts?"),
+        row("Hallo Welt wie", null, "Hallo Welt wie"),
+        row("Hallo Welt", "Hello", "Hallo Welt"),
+        row("Hallo Welt", "Hello world", "Hallo Welt"),
+        row("Hallo", "Hi", "Hallo"),
+        row("IJK", null, "IJK"),
+        row("JKL", null, "JKL"),
+        row("KLM", null, "KLM")
+      )
+    )
+  }
+
+  @Test
+  def testInnerJoinReorder(): Unit = {
+    // This test is used to test the result after join to multi join and join reorder.
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+      Boolean.box(true))
+    // Register table with stats to support join reorder,
+    // join order J(J(t5, t3), T2) will reorder to J(T5, J(T3, T2))
+    registerCollection(
+      "T5",
+      data5,
+      type5,
+      "d, e, f, g, h",
+      nullablesOfData5,
+      new FlinkStatistic(new TableStats(1000L)))
+    registerCollection(
+      "T3",
+      smallData3,
+      type3,
+      "a, b, c",
+      nullablesOfSmallData3,
+      new FlinkStatistic(new TableStats(100L)))
+    registerCollection(
+      "T2",
+      data2,
+      type2,
+      "d, e, f, g, h",
+      nullablesOfData2,
+      new FlinkStatistic(new TableStats(10L)))
+
+    checkResult(
+      """
+        |SELECT T5.g, c, T2.g FROM T5 JOIN T3 ON b = T5.e
+        |JOIN T2 ON b = T2.e WHERE T2.d > 0 AND T5.d > 0
+        |""".stripMargin,
+      Seq(
+        row("Hallo", "Hi", "Hallo"),
+        row("Hallo Welt", "Hello", "Hallo Welt"),
+        row("Hallo Welt", "Hello world", "Hallo Welt")
+      )
+    )
+  }
+
   @Test
   def testJoinWithAggregation(): Unit = {
     checkResult("SELECT COUNT(g), COUNT(b) FROM SmallTable3, Table5 WHERE a = d", Seq(row(6L, 6L)))