You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/10/13 13:36:50 UTC

[GitHub] [doris] morrySnow commented on a diff in pull request #13353: [feature](Nereids): use Multi join to rearrange join to eliminate cross join by using predicate.

morrySnow commented on code in PR #13353:
URL: https://github.com/apache/doris/pull/13353#discussion_r994551790


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ReorderJoin.java:
##########
@@ -45,13 +64,237 @@ public class ReorderJoin extends OneRewriteRuleFactory {
     public Rule build() {
         return logicalFilter(subTree(LogicalJoin.class, LogicalFilter.class)).thenApply(ctx -> {
             LogicalFilter<Plan> filter = ctx.root;
-            if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
-                    .isEnableNereidsReorderToEliminateCrossJoin()) {
-                return filter;
-            }
-            MultiJoin multiJoin = new MultiJoin();
-            filter.accept(multiJoin, null);
-            return multiJoin.reorderJoinsAccordingToConditions().orElse(filter);
+
+            MultiJoin multiJoin = (MultiJoin) joinToMultiJoin(filter);

Review Comment:
   add a instanceof assert. and throw exception when return object type is not MultiJoin



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MultiJoin.java:
##########
@@ -17,189 +17,170 @@
 
 package org.apache.doris.nereids.rules.rewrite.logical;
 
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.AbstractLogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-import org.apache.doris.nereids.util.ExpressionUtils;
-import org.apache.doris.nereids.util.JoinUtils;
-import org.apache.doris.nereids.util.PlanUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A MultiJoin represents a join of N inputs (NAry-Join).
  * The regular Join represent strictly binary input (Binary-Join).
+ * <p>
+ * One {@link MultiJoin} just contains one {@link JoinType} of SEMI/ANTI/OUTER Join.
+ * <p>
+ * onlyJoinType is Full OUTER JOIN, children.size() == 2.
+ * leftChild [Full OUTER JOIN] rightChild.
+ * <p>
+ * onlyJoinType is LEFT (OUTER/SEMI/ANTI) JOIN,
+ * children[0, last) [LEFT (OUTER/SEMI/ANTI) JOIN] lastChild.
+ * eg: MJ([LOJ] A, B, C, D) is {A B C} [LOJ] {D}.
+ * <p>
+ * onlyJoinType is RIGHT (OUTER/SEMI/ANTI) JOIN,
+ * firstChild [RIGHT (OUTER/SEMI/ANTI) JOIN] children[1, last].
+ * eg: MJ([ROJ] A, B, C, D) is {A} [ROJ] {B C D}.
  */
-public class MultiJoin extends PlanVisitor<Void, Void> {
+public class MultiJoin extends AbstractLogicalPlan {
     /*
      *        topJoin
      *        /     \            MultiJoin
      *   bottomJoin  C  -->     /    |    \
      *     /    \              A     B     C
      *    A      B
      */
-    public final List<Plan> joinInputs = new ArrayList<>();
-    public final List<Expression> conjunctsForAllHashJoins = new ArrayList<>();
-    private List<Expression> conjunctsKeepInFilter = new ArrayList<>();
-
-    /**
-     * reorderJoinsAccordingToConditions
-     *
-     * @return join or filter
-     */
-    public Optional<Plan> reorderJoinsAccordingToConditions() {
-        if (joinInputs.size() >= 2) {
-            Plan root = reorderJoinsAccordingToConditions(joinInputs, conjunctsForAllHashJoins);
-            return Optional.of(PlanUtils.filterOrSelf(conjunctsKeepInFilter, root));
-        }
-        return Optional.empty();
+
+    // Push predicates into it.
+    // But joinFilter shouldn't contain predicate which just contains one predicate like `T.key > 1`.
+    // Because these predicate should be pushdown.
+    private final List<Expression> joinFilter;
+    // MultiJoin just contains one OUTER/SEMI/ANTI.
+    private final Optional<JoinType> onlyJoinType;

Review Comment:
   i think joinType is enough



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ReorderJoin.java:
##########
@@ -37,21 +56,296 @@
  * SELECT * FROM t1 JOIN t3 ON t1.id=t3.id JOIN t2 ON t2.id=t3.id
  * </pre>
  * </p>
- * TODO: This is tested by SSB queries currently, add more `unit` test for this rule
- * when we have a plan building and comparing framework.
+ * Using the {@link MultiJoin} to complete this task.
+ * {Join cluster}: contain multiple join with filter inside.
+ * <ul>
+ * <li> {Join cluster} to MultiJoin</li>
+ * <li> MultiJoin to {Join cluster}</li>
+ * </ul>
  */
 public class ReorderJoin extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
         return logicalFilter(subTree(LogicalJoin.class, LogicalFilter.class)).thenApply(ctx -> {
             LogicalFilter<Plan> filter = ctx.root;
-            if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
-                    .isEnableNereidsReorderToEliminateCrossJoin()) {
-                return filter;
-            }
-            MultiJoin multiJoin = new MultiJoin();
-            filter.accept(multiJoin, null);
-            return multiJoin.reorderJoinsAccordingToConditions().orElse(filter);
+
+            MultiJoin multiJoin = (MultiJoin) joinToMultiJoin(filter);
+            Plan plan = multiJoinToJoin(multiJoin);
+            return plan;
         }).toRule(RuleType.REORDER_JOIN);
     }
+
+    /**
+     * Recursively convert to
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * --> {@link MultiJoin}
+     */
+    public Plan joinToMultiJoin(Plan plan) {
+        // subtree can't specify the end of Pattern. so end can be GroupPlan or Filter
+        if (plan instanceof GroupPlan
+                || (plan instanceof LogicalFilter && plan.child(0) instanceof GroupPlan)) {
+            return plan;
+        }
+
+        List<Plan> inputs = Lists.newArrayList();
+        List<Expression> joinFilter = Lists.newArrayList();
+        List<Expression> notInnerJoinConditions = Lists.newArrayList();
+
+        LogicalJoin<?, ?> join;
+        if (plan instanceof LogicalFilter) {
+            LogicalFilter<?> filter = (LogicalFilter<?>) plan;
+            joinFilter.addAll(ExpressionUtils.extractConjunction(filter.getPredicates()));
+            join = (LogicalJoin<?, ?>) filter.child();

Review Comment:
   we have a Implicit rely on MergeFilter. Add this info on class comment~



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MultiJoin.java:
##########
@@ -17,189 +17,170 @@
 
 package org.apache.doris.nereids.rules.rewrite.logical;
 
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.AbstractLogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-import org.apache.doris.nereids.util.ExpressionUtils;
-import org.apache.doris.nereids.util.JoinUtils;
-import org.apache.doris.nereids.util.PlanUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A MultiJoin represents a join of N inputs (NAry-Join).
  * The regular Join represent strictly binary input (Binary-Join).
+ * <p>
+ * One {@link MultiJoin} just contains one {@link JoinType} of SEMI/ANTI/OUTER Join.
+ * <p>
+ * onlyJoinType is Full OUTER JOIN, children.size() == 2.
+ * leftChild [Full OUTER JOIN] rightChild.
+ * <p>
+ * onlyJoinType is LEFT (OUTER/SEMI/ANTI) JOIN,
+ * children[0, last) [LEFT (OUTER/SEMI/ANTI) JOIN] lastChild.
+ * eg: MJ([LOJ] A, B, C, D) is {A B C} [LOJ] {D}.
+ * <p>
+ * onlyJoinType is RIGHT (OUTER/SEMI/ANTI) JOIN,
+ * firstChild [RIGHT (OUTER/SEMI/ANTI) JOIN] children[1, last].
+ * eg: MJ([ROJ] A, B, C, D) is {A} [ROJ] {B C D}.
  */
-public class MultiJoin extends PlanVisitor<Void, Void> {
+public class MultiJoin extends AbstractLogicalPlan {
     /*
      *        topJoin
      *        /     \            MultiJoin
      *   bottomJoin  C  -->     /    |    \
      *     /    \              A     B     C
      *    A      B
      */
-    public final List<Plan> joinInputs = new ArrayList<>();
-    public final List<Expression> conjunctsForAllHashJoins = new ArrayList<>();
-    private List<Expression> conjunctsKeepInFilter = new ArrayList<>();
-
-    /**
-     * reorderJoinsAccordingToConditions
-     *
-     * @return join or filter
-     */
-    public Optional<Plan> reorderJoinsAccordingToConditions() {
-        if (joinInputs.size() >= 2) {
-            Plan root = reorderJoinsAccordingToConditions(joinInputs, conjunctsForAllHashJoins);
-            return Optional.of(PlanUtils.filterOrSelf(conjunctsKeepInFilter, root));
-        }
-        return Optional.empty();
+
+    // Push predicates into it.
+    // But joinFilter shouldn't contain predicate which just contains one predicate like `T.key > 1`.
+    // Because these predicate should be pushdown.
+    private final List<Expression> joinFilter;
+    // MultiJoin just contains one OUTER/SEMI/ANTI.
+    private final Optional<JoinType> onlyJoinType;
+    // When contains one OUTER/SEMI/ANTI join, keep separately its condition.
+    private final List<Expression> notInnerJoinConditions;
+
+    // private final List<@Nullable List<NamedExpression>> projFields;

Review Comment:
   why comment this?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ReorderJoin.java:
##########
@@ -37,21 +56,296 @@
  * SELECT * FROM t1 JOIN t3 ON t1.id=t3.id JOIN t2 ON t2.id=t3.id
  * </pre>
  * </p>
- * TODO: This is tested by SSB queries currently, add more `unit` test for this rule
- * when we have a plan building and comparing framework.
+ * Using the {@link MultiJoin} to complete this task.
+ * {Join cluster}: contain multiple join with filter inside.
+ * <ul>
+ * <li> {Join cluster} to MultiJoin</li>
+ * <li> MultiJoin to {Join cluster}</li>
+ * </ul>
  */
 public class ReorderJoin extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
         return logicalFilter(subTree(LogicalJoin.class, LogicalFilter.class)).thenApply(ctx -> {
             LogicalFilter<Plan> filter = ctx.root;
-            if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
-                    .isEnableNereidsReorderToEliminateCrossJoin()) {
-                return filter;
-            }
-            MultiJoin multiJoin = new MultiJoin();
-            filter.accept(multiJoin, null);
-            return multiJoin.reorderJoinsAccordingToConditions().orElse(filter);
+
+            MultiJoin multiJoin = (MultiJoin) joinToMultiJoin(filter);
+            Plan plan = multiJoinToJoin(multiJoin);
+            return plan;
         }).toRule(RuleType.REORDER_JOIN);
     }
+
+    /**
+     * Recursively convert to
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * --> {@link MultiJoin}
+     */
+    public Plan joinToMultiJoin(Plan plan) {
+        // subtree can't specify the end of Pattern. so end can be GroupPlan or Filter
+        if (plan instanceof GroupPlan
+                || (plan instanceof LogicalFilter && plan.child(0) instanceof GroupPlan)) {
+            return plan;
+        }
+
+        List<Plan> inputs = Lists.newArrayList();
+        List<Expression> joinFilter = Lists.newArrayList();
+        List<Expression> notInnerJoinConditions = Lists.newArrayList();
+
+        LogicalJoin<?, ?> join;
+        if (plan instanceof LogicalFilter) {
+            LogicalFilter<?> filter = (LogicalFilter<?>) plan;
+            joinFilter.addAll(ExpressionUtils.extractConjunction(filter.getPredicates()));
+            join = (LogicalJoin<?, ?>) filter.child();
+        } else {
+            join = (LogicalJoin<?, ?>) plan;
+        }
+
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinFilter.addAll(join.getHashJoinConjuncts());
+            joinFilter.addAll(join.getOtherJoinConjuncts());
+        } else {
+            notInnerJoinConditions.addAll(join.getHashJoinConjuncts());
+            notInnerJoinConditions.addAll(join.getOtherJoinConjuncts());
+        }
+
+        // recursively convert children.
+        Plan left = joinToMultiJoin(join.left());
+        Plan right = joinToMultiJoin(join.right());
+
+        boolean changeLeft = join.getJoinType().isRightJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(left, changeLeft)) {
+            MultiJoin leftMultiJoin = (MultiJoin) left;
+            inputs.addAll(leftMultiJoin.children());
+            joinFilter.addAll(leftMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(left);
+        }
+
+        boolean changeRight = join.getJoinType().isLeftJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(right, changeRight)) {
+            MultiJoin rightMultiJoin = (MultiJoin) right;
+            inputs.addAll(rightMultiJoin.children());
+            joinFilter.addAll(rightMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(right);
+        }
+
+        Optional<JoinType> joinType;
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinType = Optional.empty();
+        } else {
+            joinType = Optional.of(join.getJoinType());
+        }
+        return new MultiJoin(
+                inputs,
+                joinFilter,
+                joinType,
+                notInnerJoinConditions);
+    }
+
+    /**
+     * Recursively convert to
+     * {@link MultiJoin}
+     * -->
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * <p>
+     * When all input is CROSS/Inner Join, all join will be flattened.
+     * Otherwise, we will split {join cluster} into multiple {@link MultiJoin}.
+     * <p>
+     * Here are examples of the {@link MultiJoin}s constructed after this rules has been applied.
+     * <p>
+     * simple example:
+     * <ul>
+     * <li>A JOIN B --> MJ(A, B)
+     * <li>A JOIN B JOIN C JOIN D --> MJ(A, B, C, D)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([LOJ/LSJ/LAJ]A, B)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([ROJ/RSJ/RAJ]A, B)
+     * <li>A FULL JOIN B --> MJ[FOJ](A, B)
+     * </ul>
+     * </p>
+     * <p>
+     * complex example:
+     * <ul>
+     * <li>A LEFT OUTER JOIN (B JOIN C) --> MJ([LOJ]A, MJ(B, C)))
+     * <li>(A JOIN B) LEFT JOIN C --> MJ(A, B, C)
+     * <li>(A LEFT OUTER JOIN B) JOIN C --> MJ(MJ(A, B), C)
+     * <li>A LEFT JOIN (B FULL JOIN C) --> MJ(A, MJ[full](B, C))
+     * <li>(A LEFT JOIN B) FULL JOIN (C RIGHT JOIN D) --> MJ[full](MJ(A, B), MJ(C, D))
+     * </ul>
+     * </p>
+     * more complex example:
+     * <ul>
+     * <li> A JOIN B JOIN C LEFT JOIN D --> MJ([LOJ]A, B, C, D)
+     * <li> A JOIN B JOIN C LEFT JOIN (D JOIN F) --> MJ([LOJ]A, B, C, MJ(D, F))
+     * <li> A RIGHT JOIN (B JOIN C JOIN D)--> MJ([ROJ]A, B, C, D)
+     * <li> A JOIN B RIGHT JOIN (C JOIN D) --> MJ(A, B, MJ([ROJ]C, D))
+     * </ul>
+     * </p>
+     * <p>
+     * Graphic presentation:
+     * A JOIN B JOIN C LEFT JOIN D JOIN F
+     *      left                  left│
+     * A  B  C  D  F   ──►   A  B  C  │ D  F   ──►  MJ(LOJ A,B,C,MJ(DF)
+     * <p>
+     * A JOIN B RIGHT JOIN C JOIN D JOIN F
+     *     right                  │right
+     * A  B  C  D  F   ──►   A  B │  C  D  F   ──►  MJ(A,B,MJ(ROJ C,D,F)
+     * <p>
+     * (A JOIN B JOIN C) FULL JOIN (D JOIN F)
+     *       full                    │
+     * A  B  C  D  F   ──►   A  B  C │ D  F    ──►  MJ(FOJ MJ(A,B,C) MJ(D,F))
+     * </p>
+     */
+    public Plan multiJoinToJoin(MultiJoin multiJoin) {
+        if (multiJoin.arity() == 1) {
+            return PlanUtils.filterOrSelf(multiJoin.getJoinFilter(), multiJoin.child(0));
+        }
+
+        Builder<Plan> builder = ImmutableList.builder();
+        // recursively hanlde multiJoin children.
+        for (Plan child : multiJoin.children()) {
+            if (child instanceof MultiJoin) {
+                MultiJoin childMultiJoin = (MultiJoin) child;
+                builder.add(multiJoinToJoin(childMultiJoin));
+            } else {
+                builder.add(child);
+            }
+        }
+        MultiJoin multiJoinHandleChildren = multiJoin.withChildren(builder.build());
+
+        if (multiJoinHandleChildren.getOnlyJoinType().isPresent()) {
+            List<Expression> leftFilter = Lists.newArrayList();
+            List<Expression> rightFilter = Lists.newArrayList();
+            List<Expression> remainingFilter = Lists.newArrayList();
+            Plan left = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(0, multiJoinHandleChildren.arity() - 1),
+                    leftFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));
+            Plan right = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(1, multiJoinHandleChildren.arity()),
+                    rightFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));
+            if (multiJoinHandleChildren.getOnlyJoinType().get().isLeftJoin()) {
+                right = multiJoinHandleChildren.child(multiJoinHandleChildren.arity() - 1);
+            } else if (multiJoinHandleChildren.getOnlyJoinType().get().isRightJoin()) {
+                left = multiJoinHandleChildren.child(0);
+            }
+
+            // split filter
+            for (Expression expr : multiJoinHandleChildren.getJoinFilter()) {
+                Set<Slot> exprInputSlots = expr.getInputSlots();
+                Preconditions.checkState(!exprInputSlots.isEmpty());
+
+                if (left.getOutputSet().containsAll(exprInputSlots)) {
+                    leftFilter.add(expr);
+                } else if (right.getOutputSet().containsAll(exprInputSlots)) {
+                    rightFilter.add(expr);
+                } else if (multiJoin.getOutputSet().containsAll(exprInputSlots)) {
+                    remainingFilter.add(expr);
+                } else {
+                    throw new RuntimeException("invalid expression");

Review Comment:
   add more info to exception, and do log



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ReorderJoin.java:
##########
@@ -37,21 +56,296 @@
  * SELECT * FROM t1 JOIN t3 ON t1.id=t3.id JOIN t2 ON t2.id=t3.id
  * </pre>
  * </p>
- * TODO: This is tested by SSB queries currently, add more `unit` test for this rule
- * when we have a plan building and comparing framework.
+ * Using the {@link MultiJoin} to complete this task.
+ * {Join cluster}: contain multiple join with filter inside.
+ * <ul>
+ * <li> {Join cluster} to MultiJoin</li>
+ * <li> MultiJoin to {Join cluster}</li>
+ * </ul>
  */
 public class ReorderJoin extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
         return logicalFilter(subTree(LogicalJoin.class, LogicalFilter.class)).thenApply(ctx -> {
             LogicalFilter<Plan> filter = ctx.root;
-            if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
-                    .isEnableNereidsReorderToEliminateCrossJoin()) {
-                return filter;
-            }
-            MultiJoin multiJoin = new MultiJoin();
-            filter.accept(multiJoin, null);
-            return multiJoin.reorderJoinsAccordingToConditions().orElse(filter);
+
+            MultiJoin multiJoin = (MultiJoin) joinToMultiJoin(filter);
+            Plan plan = multiJoinToJoin(multiJoin);
+            return plan;
         }).toRule(RuleType.REORDER_JOIN);
     }
+
+    /**
+     * Recursively convert to
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * --> {@link MultiJoin}
+     */
+    public Plan joinToMultiJoin(Plan plan) {
+        // subtree can't specify the end of Pattern. so end can be GroupPlan or Filter
+        if (plan instanceof GroupPlan
+                || (plan instanceof LogicalFilter && plan.child(0) instanceof GroupPlan)) {
+            return plan;
+        }
+
+        List<Plan> inputs = Lists.newArrayList();
+        List<Expression> joinFilter = Lists.newArrayList();
+        List<Expression> notInnerJoinConditions = Lists.newArrayList();
+
+        LogicalJoin<?, ?> join;
+        if (plan instanceof LogicalFilter) {
+            LogicalFilter<?> filter = (LogicalFilter<?>) plan;
+            joinFilter.addAll(ExpressionUtils.extractConjunction(filter.getPredicates()));
+            join = (LogicalJoin<?, ?>) filter.child();
+        } else {
+            join = (LogicalJoin<?, ?>) plan;
+        }
+
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinFilter.addAll(join.getHashJoinConjuncts());
+            joinFilter.addAll(join.getOtherJoinConjuncts());
+        } else {
+            notInnerJoinConditions.addAll(join.getHashJoinConjuncts());
+            notInnerJoinConditions.addAll(join.getOtherJoinConjuncts());
+        }
+
+        // recursively convert children.
+        Plan left = joinToMultiJoin(join.left());
+        Plan right = joinToMultiJoin(join.right());
+
+        boolean changeLeft = join.getJoinType().isRightJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(left, changeLeft)) {
+            MultiJoin leftMultiJoin = (MultiJoin) left;
+            inputs.addAll(leftMultiJoin.children());
+            joinFilter.addAll(leftMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(left);
+        }
+
+        boolean changeRight = join.getJoinType().isLeftJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(right, changeRight)) {
+            MultiJoin rightMultiJoin = (MultiJoin) right;
+            inputs.addAll(rightMultiJoin.children());
+            joinFilter.addAll(rightMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(right);
+        }
+
+        Optional<JoinType> joinType;
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinType = Optional.empty();
+        } else {
+            joinType = Optional.of(join.getJoinType());
+        }
+        return new MultiJoin(
+                inputs,
+                joinFilter,
+                joinType,
+                notInnerJoinConditions);
+    }
+
+    /**
+     * Recursively convert to
+     * {@link MultiJoin}
+     * -->
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * <p>
+     * When all input is CROSS/Inner Join, all join will be flattened.
+     * Otherwise, we will split {join cluster} into multiple {@link MultiJoin}.
+     * <p>
+     * Here are examples of the {@link MultiJoin}s constructed after this rules has been applied.
+     * <p>
+     * simple example:
+     * <ul>
+     * <li>A JOIN B --> MJ(A, B)
+     * <li>A JOIN B JOIN C JOIN D --> MJ(A, B, C, D)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([LOJ/LSJ/LAJ]A, B)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([ROJ/RSJ/RAJ]A, B)
+     * <li>A FULL JOIN B --> MJ[FOJ](A, B)
+     * </ul>
+     * </p>
+     * <p>
+     * complex example:
+     * <ul>
+     * <li>A LEFT OUTER JOIN (B JOIN C) --> MJ([LOJ]A, MJ(B, C)))
+     * <li>(A JOIN B) LEFT JOIN C --> MJ(A, B, C)
+     * <li>(A LEFT OUTER JOIN B) JOIN C --> MJ(MJ(A, B), C)
+     * <li>A LEFT JOIN (B FULL JOIN C) --> MJ(A, MJ[full](B, C))
+     * <li>(A LEFT JOIN B) FULL JOIN (C RIGHT JOIN D) --> MJ[full](MJ(A, B), MJ(C, D))
+     * </ul>
+     * </p>
+     * more complex example:
+     * <ul>
+     * <li> A JOIN B JOIN C LEFT JOIN D --> MJ([LOJ]A, B, C, D)
+     * <li> A JOIN B JOIN C LEFT JOIN (D JOIN F) --> MJ([LOJ]A, B, C, MJ(D, F))
+     * <li> A RIGHT JOIN (B JOIN C JOIN D)--> MJ([ROJ]A, B, C, D)
+     * <li> A JOIN B RIGHT JOIN (C JOIN D) --> MJ(A, B, MJ([ROJ]C, D))
+     * </ul>
+     * </p>
+     * <p>
+     * Graphic presentation:
+     * A JOIN B JOIN C LEFT JOIN D JOIN F
+     *      left                  left│
+     * A  B  C  D  F   ──►   A  B  C  │ D  F   ──►  MJ(LOJ A,B,C,MJ(DF)
+     * <p>
+     * A JOIN B RIGHT JOIN C JOIN D JOIN F
+     *     right                  │right
+     * A  B  C  D  F   ──►   A  B │  C  D  F   ──►  MJ(A,B,MJ(ROJ C,D,F)
+     * <p>
+     * (A JOIN B JOIN C) FULL JOIN (D JOIN F)
+     *       full                    │
+     * A  B  C  D  F   ──►   A  B  C │ D  F    ──►  MJ(FOJ MJ(A,B,C) MJ(D,F))
+     * </p>
+     */
+    public Plan multiJoinToJoin(MultiJoin multiJoin) {
+        if (multiJoin.arity() == 1) {
+            return PlanUtils.filterOrSelf(multiJoin.getJoinFilter(), multiJoin.child(0));
+        }
+
+        Builder<Plan> builder = ImmutableList.builder();
+        // recursively hanlde multiJoin children.
+        for (Plan child : multiJoin.children()) {
+            if (child instanceof MultiJoin) {
+                MultiJoin childMultiJoin = (MultiJoin) child;
+                builder.add(multiJoinToJoin(childMultiJoin));
+            } else {
+                builder.add(child);
+            }
+        }
+        MultiJoin multiJoinHandleChildren = multiJoin.withChildren(builder.build());
+
+        if (multiJoinHandleChildren.getOnlyJoinType().isPresent()) {
+            List<Expression> leftFilter = Lists.newArrayList();
+            List<Expression> rightFilter = Lists.newArrayList();
+            List<Expression> remainingFilter = Lists.newArrayList();
+            Plan left = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(0, multiJoinHandleChildren.arity() - 1),
+                    leftFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));
+            Plan right = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(1, multiJoinHandleChildren.arity()),
+                    rightFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));
+            if (multiJoinHandleChildren.getOnlyJoinType().get().isLeftJoin()) {
+                right = multiJoinHandleChildren.child(multiJoinHandleChildren.arity() - 1);
+            } else if (multiJoinHandleChildren.getOnlyJoinType().get().isRightJoin()) {
+                left = multiJoinHandleChildren.child(0);
+            }
+
+            // split filter
+            for (Expression expr : multiJoinHandleChildren.getJoinFilter()) {
+                Set<Slot> exprInputSlots = expr.getInputSlots();
+                Preconditions.checkState(!exprInputSlots.isEmpty());
+
+                if (left.getOutputSet().containsAll(exprInputSlots)) {
+                    leftFilter.add(expr);
+                } else if (right.getOutputSet().containsAll(exprInputSlots)) {
+                    rightFilter.add(expr);
+                } else if (multiJoin.getOutputSet().containsAll(exprInputSlots)) {
+                    remainingFilter.add(expr);
+                } else {
+                    throw new RuntimeException("invalid expression");
+                }
+            }
+
+            return PlanUtils.filterOrSelf(remainingFilter, new LogicalJoin<>(
+                    multiJoinHandleChildren.getOnlyJoinType().get(),
+                    ExpressionUtils.EMPTY_CONDITION,
+                    multiJoinHandleChildren.getNotInnerJoinConditions(),
+                    PlanUtils.filterOrSelf(leftFilter, left), PlanUtils.filterOrSelf(rightFilter, right)));
+        }
+
+        // following this multiJoin just contain INNER/CROSS.
+        List<Expression> joinFilter = multiJoinHandleChildren.getJoinFilter();
+
+        Plan left = multiJoinHandleChildren.child(0);
+        List<Plan> candidates = multiJoinHandleChildren.children().subList(1, multiJoinHandleChildren.arity());
+
+        LogicalJoin<? extends Plan, ? extends Plan> join = findInnerJoin(left, candidates, joinFilter);
+        List<Plan> newInputs = Lists.newArrayList();
+        newInputs.add(join);
+        newInputs.addAll(candidates.stream().filter(plan -> !join.right().equals(plan)).collect(Collectors.toList()));
+
+        joinFilter.removeAll(join.getHashJoinConjuncts());
+        joinFilter.removeAll(join.getOtherJoinConjuncts());
+        return multiJoinToJoin(new MultiJoin(
+                newInputs,
+                joinFilter,
+                Optional.empty(),
+                ExpressionUtils.EMPTY_CONDITION));

Review Comment:
   could eliminate this recursion?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ReorderJoin.java:
##########
@@ -37,21 +56,296 @@
  * SELECT * FROM t1 JOIN t3 ON t1.id=t3.id JOIN t2 ON t2.id=t3.id
  * </pre>
  * </p>
- * TODO: This is tested by SSB queries currently, add more `unit` test for this rule
- * when we have a plan building and comparing framework.
+ * Using the {@link MultiJoin} to complete this task.
+ * {Join cluster}: contain multiple join with filter inside.
+ * <ul>
+ * <li> {Join cluster} to MultiJoin</li>
+ * <li> MultiJoin to {Join cluster}</li>
+ * </ul>
  */
 public class ReorderJoin extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
         return logicalFilter(subTree(LogicalJoin.class, LogicalFilter.class)).thenApply(ctx -> {
             LogicalFilter<Plan> filter = ctx.root;
-            if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
-                    .isEnableNereidsReorderToEliminateCrossJoin()) {
-                return filter;
-            }
-            MultiJoin multiJoin = new MultiJoin();
-            filter.accept(multiJoin, null);
-            return multiJoin.reorderJoinsAccordingToConditions().orElse(filter);
+
+            MultiJoin multiJoin = (MultiJoin) joinToMultiJoin(filter);
+            Plan plan = multiJoinToJoin(multiJoin);
+            return plan;
         }).toRule(RuleType.REORDER_JOIN);
     }
+
+    /**
+     * Recursively convert to
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * --> {@link MultiJoin}
+     */
+    public Plan joinToMultiJoin(Plan plan) {
+        // subtree can't specify the end of Pattern. so end can be GroupPlan or Filter
+        if (plan instanceof GroupPlan
+                || (plan instanceof LogicalFilter && plan.child(0) instanceof GroupPlan)) {
+            return plan;
+        }
+
+        List<Plan> inputs = Lists.newArrayList();
+        List<Expression> joinFilter = Lists.newArrayList();
+        List<Expression> notInnerJoinConditions = Lists.newArrayList();
+
+        LogicalJoin<?, ?> join;
+        if (plan instanceof LogicalFilter) {
+            LogicalFilter<?> filter = (LogicalFilter<?>) plan;
+            joinFilter.addAll(ExpressionUtils.extractConjunction(filter.getPredicates()));
+            join = (LogicalJoin<?, ?>) filter.child();
+        } else {
+            join = (LogicalJoin<?, ?>) plan;
+        }
+
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinFilter.addAll(join.getHashJoinConjuncts());
+            joinFilter.addAll(join.getOtherJoinConjuncts());
+        } else {
+            notInnerJoinConditions.addAll(join.getHashJoinConjuncts());
+            notInnerJoinConditions.addAll(join.getOtherJoinConjuncts());
+        }
+
+        // recursively convert children.
+        Plan left = joinToMultiJoin(join.left());
+        Plan right = joinToMultiJoin(join.right());
+
+        boolean changeLeft = join.getJoinType().isRightJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(left, changeLeft)) {
+            MultiJoin leftMultiJoin = (MultiJoin) left;
+            inputs.addAll(leftMultiJoin.children());
+            joinFilter.addAll(leftMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(left);
+        }
+
+        boolean changeRight = join.getJoinType().isLeftJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(right, changeRight)) {
+            MultiJoin rightMultiJoin = (MultiJoin) right;
+            inputs.addAll(rightMultiJoin.children());
+            joinFilter.addAll(rightMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(right);
+        }
+
+        Optional<JoinType> joinType;
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinType = Optional.empty();
+        } else {
+            joinType = Optional.of(join.getJoinType());
+        }
+        return new MultiJoin(
+                inputs,
+                joinFilter,
+                joinType,
+                notInnerJoinConditions);
+    }
+
+    /**
+     * Recursively convert to
+     * {@link MultiJoin}
+     * -->
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * <p>
+     * When all input is CROSS/Inner Join, all join will be flattened.
+     * Otherwise, we will split {join cluster} into multiple {@link MultiJoin}.
+     * <p>
+     * Here are examples of the {@link MultiJoin}s constructed after this rules has been applied.
+     * <p>
+     * simple example:
+     * <ul>
+     * <li>A JOIN B --> MJ(A, B)
+     * <li>A JOIN B JOIN C JOIN D --> MJ(A, B, C, D)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([LOJ/LSJ/LAJ]A, B)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([ROJ/RSJ/RAJ]A, B)
+     * <li>A FULL JOIN B --> MJ[FOJ](A, B)
+     * </ul>
+     * </p>
+     * <p>
+     * complex example:
+     * <ul>
+     * <li>A LEFT OUTER JOIN (B JOIN C) --> MJ([LOJ]A, MJ(B, C)))
+     * <li>(A JOIN B) LEFT JOIN C --> MJ(A, B, C)
+     * <li>(A LEFT OUTER JOIN B) JOIN C --> MJ(MJ(A, B), C)
+     * <li>A LEFT JOIN (B FULL JOIN C) --> MJ(A, MJ[full](B, C))
+     * <li>(A LEFT JOIN B) FULL JOIN (C RIGHT JOIN D) --> MJ[full](MJ(A, B), MJ(C, D))
+     * </ul>
+     * </p>
+     * more complex example:
+     * <ul>
+     * <li> A JOIN B JOIN C LEFT JOIN D --> MJ([LOJ]A, B, C, D)
+     * <li> A JOIN B JOIN C LEFT JOIN (D JOIN F) --> MJ([LOJ]A, B, C, MJ(D, F))
+     * <li> A RIGHT JOIN (B JOIN C JOIN D)--> MJ([ROJ]A, B, C, D)
+     * <li> A JOIN B RIGHT JOIN (C JOIN D) --> MJ(A, B, MJ([ROJ]C, D))
+     * </ul>
+     * </p>
+     * <p>
+     * Graphic presentation:
+     * A JOIN B JOIN C LEFT JOIN D JOIN F
+     *      left                  left│
+     * A  B  C  D  F   ──►   A  B  C  │ D  F   ──►  MJ(LOJ A,B,C,MJ(DF)
+     * <p>
+     * A JOIN B RIGHT JOIN C JOIN D JOIN F
+     *     right                  │right
+     * A  B  C  D  F   ──►   A  B │  C  D  F   ──►  MJ(A,B,MJ(ROJ C,D,F)
+     * <p>
+     * (A JOIN B JOIN C) FULL JOIN (D JOIN F)
+     *       full                    │
+     * A  B  C  D  F   ──►   A  B  C │ D  F    ──►  MJ(FOJ MJ(A,B,C) MJ(D,F))
+     * </p>
+     */
+    public Plan multiJoinToJoin(MultiJoin multiJoin) {
+        if (multiJoin.arity() == 1) {
+            return PlanUtils.filterOrSelf(multiJoin.getJoinFilter(), multiJoin.child(0));
+        }
+
+        Builder<Plan> builder = ImmutableList.builder();
+        // recursively hanlde multiJoin children.
+        for (Plan child : multiJoin.children()) {
+            if (child instanceof MultiJoin) {
+                MultiJoin childMultiJoin = (MultiJoin) child;
+                builder.add(multiJoinToJoin(childMultiJoin));
+            } else {
+                builder.add(child);
+            }
+        }
+        MultiJoin multiJoinHandleChildren = multiJoin.withChildren(builder.build());
+
+        if (multiJoinHandleChildren.getOnlyJoinType().isPresent()) {
+            List<Expression> leftFilter = Lists.newArrayList();
+            List<Expression> rightFilter = Lists.newArrayList();
+            List<Expression> remainingFilter = Lists.newArrayList();
+            Plan left = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(0, multiJoinHandleChildren.arity() - 1),
+                    leftFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));
+            Plan right = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(1, multiJoinHandleChildren.arity()),
+                    rightFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));

Review Comment:
   these only use in FULL OUTER JOIN, should add a if branch



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MultiJoin.java:
##########
@@ -17,189 +17,170 @@
 
 package org.apache.doris.nereids.rules.rewrite.logical;
 
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.AbstractLogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-import org.apache.doris.nereids.util.ExpressionUtils;
-import org.apache.doris.nereids.util.JoinUtils;
-import org.apache.doris.nereids.util.PlanUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A MultiJoin represents a join of N inputs (NAry-Join).
  * The regular Join represent strictly binary input (Binary-Join).
+ * <p>
+ * One {@link MultiJoin} just contains one {@link JoinType} of SEMI/ANTI/OUTER Join.
+ * <p>
+ * onlyJoinType is Full OUTER JOIN, children.size() == 2.
+ * leftChild [Full OUTER JOIN] rightChild.

Review Comment:
   ```suggestion
    * leftChild [FULL OUTER JOIN] rightChild.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ReorderJoin.java:
##########
@@ -37,21 +56,296 @@
  * SELECT * FROM t1 JOIN t3 ON t1.id=t3.id JOIN t2 ON t2.id=t3.id
  * </pre>
  * </p>
- * TODO: This is tested by SSB queries currently, add more `unit` test for this rule
- * when we have a plan building and comparing framework.
+ * Using the {@link MultiJoin} to complete this task.
+ * {Join cluster}: contain multiple join with filter inside.
+ * <ul>
+ * <li> {Join cluster} to MultiJoin</li>
+ * <li> MultiJoin to {Join cluster}</li>
+ * </ul>
  */
 public class ReorderJoin extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
         return logicalFilter(subTree(LogicalJoin.class, LogicalFilter.class)).thenApply(ctx -> {
             LogicalFilter<Plan> filter = ctx.root;
-            if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
-                    .isEnableNereidsReorderToEliminateCrossJoin()) {
-                return filter;
-            }
-            MultiJoin multiJoin = new MultiJoin();
-            filter.accept(multiJoin, null);
-            return multiJoin.reorderJoinsAccordingToConditions().orElse(filter);
+
+            MultiJoin multiJoin = (MultiJoin) joinToMultiJoin(filter);
+            Plan plan = multiJoinToJoin(multiJoin);
+            return plan;
         }).toRule(RuleType.REORDER_JOIN);
     }
+
+    /**
+     * Recursively convert to
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * --> {@link MultiJoin}
+     */
+    public Plan joinToMultiJoin(Plan plan) {

Review Comment:
   should we put this function into JoinUtils?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ReorderJoin.java:
##########
@@ -37,21 +56,296 @@
  * SELECT * FROM t1 JOIN t3 ON t1.id=t3.id JOIN t2 ON t2.id=t3.id
  * </pre>
  * </p>
- * TODO: This is tested by SSB queries currently, add more `unit` test for this rule
- * when we have a plan building and comparing framework.
+ * Using the {@link MultiJoin} to complete this task.
+ * {Join cluster}: contain multiple join with filter inside.
+ * <ul>
+ * <li> {Join cluster} to MultiJoin</li>
+ * <li> MultiJoin to {Join cluster}</li>
+ * </ul>
  */
 public class ReorderJoin extends OneRewriteRuleFactory {
     @Override
     public Rule build() {
         return logicalFilter(subTree(LogicalJoin.class, LogicalFilter.class)).thenApply(ctx -> {
             LogicalFilter<Plan> filter = ctx.root;
-            if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
-                    .isEnableNereidsReorderToEliminateCrossJoin()) {
-                return filter;
-            }
-            MultiJoin multiJoin = new MultiJoin();
-            filter.accept(multiJoin, null);
-            return multiJoin.reorderJoinsAccordingToConditions().orElse(filter);
+
+            MultiJoin multiJoin = (MultiJoin) joinToMultiJoin(filter);
+            Plan plan = multiJoinToJoin(multiJoin);
+            return plan;
         }).toRule(RuleType.REORDER_JOIN);
     }
+
+    /**
+     * Recursively convert to
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * --> {@link MultiJoin}
+     */
+    public Plan joinToMultiJoin(Plan plan) {
+        // subtree can't specify the end of Pattern. so end can be GroupPlan or Filter
+        if (plan instanceof GroupPlan
+                || (plan instanceof LogicalFilter && plan.child(0) instanceof GroupPlan)) {
+            return plan;
+        }
+
+        List<Plan> inputs = Lists.newArrayList();
+        List<Expression> joinFilter = Lists.newArrayList();
+        List<Expression> notInnerJoinConditions = Lists.newArrayList();
+
+        LogicalJoin<?, ?> join;
+        if (plan instanceof LogicalFilter) {
+            LogicalFilter<?> filter = (LogicalFilter<?>) plan;
+            joinFilter.addAll(ExpressionUtils.extractConjunction(filter.getPredicates()));
+            join = (LogicalJoin<?, ?>) filter.child();
+        } else {
+            join = (LogicalJoin<?, ?>) plan;
+        }
+
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinFilter.addAll(join.getHashJoinConjuncts());
+            joinFilter.addAll(join.getOtherJoinConjuncts());
+        } else {
+            notInnerJoinConditions.addAll(join.getHashJoinConjuncts());
+            notInnerJoinConditions.addAll(join.getOtherJoinConjuncts());
+        }
+
+        // recursively convert children.
+        Plan left = joinToMultiJoin(join.left());
+        Plan right = joinToMultiJoin(join.right());
+
+        boolean changeLeft = join.getJoinType().isRightJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(left, changeLeft)) {
+            MultiJoin leftMultiJoin = (MultiJoin) left;
+            inputs.addAll(leftMultiJoin.children());
+            joinFilter.addAll(leftMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(left);
+        }
+
+        boolean changeRight = join.getJoinType().isLeftJoin()
+                || join.getJoinType().isFullOuterJoin();
+        if (canCombine(right, changeRight)) {
+            MultiJoin rightMultiJoin = (MultiJoin) right;
+            inputs.addAll(rightMultiJoin.children());
+            joinFilter.addAll(rightMultiJoin.getJoinFilter());
+        } else {
+            inputs.add(right);
+        }
+
+        Optional<JoinType> joinType;
+        if (join.getJoinType().isInnerOrCrossJoin()) {
+            joinType = Optional.empty();
+        } else {
+            joinType = Optional.of(join.getJoinType());
+        }
+        return new MultiJoin(
+                inputs,
+                joinFilter,
+                joinType,
+                notInnerJoinConditions);
+    }
+
+    /**
+     * Recursively convert to
+     * {@link MultiJoin}
+     * -->
+     * {@link LogicalJoin} or
+     * {@link LogicalFilter}--{@link LogicalJoin}
+     * <p>
+     * When all input is CROSS/Inner Join, all join will be flattened.
+     * Otherwise, we will split {join cluster} into multiple {@link MultiJoin}.
+     * <p>
+     * Here are examples of the {@link MultiJoin}s constructed after this rules has been applied.
+     * <p>
+     * simple example:
+     * <ul>
+     * <li>A JOIN B --> MJ(A, B)
+     * <li>A JOIN B JOIN C JOIN D --> MJ(A, B, C, D)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([LOJ/LSJ/LAJ]A, B)
+     * <li>A LEFT (OUTER/SEMI/ANTI) JOIN B --> MJ([ROJ/RSJ/RAJ]A, B)
+     * <li>A FULL JOIN B --> MJ[FOJ](A, B)
+     * </ul>
+     * </p>
+     * <p>
+     * complex example:
+     * <ul>
+     * <li>A LEFT OUTER JOIN (B JOIN C) --> MJ([LOJ]A, MJ(B, C)))
+     * <li>(A JOIN B) LEFT JOIN C --> MJ(A, B, C)
+     * <li>(A LEFT OUTER JOIN B) JOIN C --> MJ(MJ(A, B), C)
+     * <li>A LEFT JOIN (B FULL JOIN C) --> MJ(A, MJ[full](B, C))
+     * <li>(A LEFT JOIN B) FULL JOIN (C RIGHT JOIN D) --> MJ[full](MJ(A, B), MJ(C, D))
+     * </ul>
+     * </p>
+     * more complex example:
+     * <ul>
+     * <li> A JOIN B JOIN C LEFT JOIN D --> MJ([LOJ]A, B, C, D)
+     * <li> A JOIN B JOIN C LEFT JOIN (D JOIN F) --> MJ([LOJ]A, B, C, MJ(D, F))
+     * <li> A RIGHT JOIN (B JOIN C JOIN D)--> MJ([ROJ]A, B, C, D)
+     * <li> A JOIN B RIGHT JOIN (C JOIN D) --> MJ(A, B, MJ([ROJ]C, D))
+     * </ul>
+     * </p>
+     * <p>
+     * Graphic presentation:
+     * A JOIN B JOIN C LEFT JOIN D JOIN F
+     *      left                  left│
+     * A  B  C  D  F   ──►   A  B  C  │ D  F   ──►  MJ(LOJ A,B,C,MJ(DF)
+     * <p>
+     * A JOIN B RIGHT JOIN C JOIN D JOIN F
+     *     right                  │right
+     * A  B  C  D  F   ──►   A  B │  C  D  F   ──►  MJ(A,B,MJ(ROJ C,D,F)
+     * <p>
+     * (A JOIN B JOIN C) FULL JOIN (D JOIN F)
+     *       full                    │
+     * A  B  C  D  F   ──►   A  B  C │ D  F    ──►  MJ(FOJ MJ(A,B,C) MJ(D,F))
+     * </p>
+     */
+    public Plan multiJoinToJoin(MultiJoin multiJoin) {
+        if (multiJoin.arity() == 1) {
+            return PlanUtils.filterOrSelf(multiJoin.getJoinFilter(), multiJoin.child(0));
+        }
+
+        Builder<Plan> builder = ImmutableList.builder();
+        // recursively hanlde multiJoin children.
+        for (Plan child : multiJoin.children()) {
+            if (child instanceof MultiJoin) {
+                MultiJoin childMultiJoin = (MultiJoin) child;
+                builder.add(multiJoinToJoin(childMultiJoin));
+            } else {
+                builder.add(child);
+            }
+        }
+        MultiJoin multiJoinHandleChildren = multiJoin.withChildren(builder.build());
+
+        if (multiJoinHandleChildren.getOnlyJoinType().isPresent()) {
+            List<Expression> leftFilter = Lists.newArrayList();
+            List<Expression> rightFilter = Lists.newArrayList();
+            List<Expression> remainingFilter = Lists.newArrayList();
+            Plan left = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(0, multiJoinHandleChildren.arity() - 1),
+                    leftFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));
+            Plan right = multiJoinToJoin(new MultiJoin(
+                    multiJoinHandleChildren.children().subList(1, multiJoinHandleChildren.arity()),
+                    rightFilter,
+                    Optional.empty(),
+                    ExpressionUtils.EMPTY_CONDITION));
+            if (multiJoinHandleChildren.getOnlyJoinType().get().isLeftJoin()) {
+                right = multiJoinHandleChildren.child(multiJoinHandleChildren.arity() - 1);
+            } else if (multiJoinHandleChildren.getOnlyJoinType().get().isRightJoin()) {
+                left = multiJoinHandleChildren.child(0);
+            }
+
+            // split filter
+            for (Expression expr : multiJoinHandleChildren.getJoinFilter()) {
+                Set<Slot> exprInputSlots = expr.getInputSlots();
+                Preconditions.checkState(!exprInputSlots.isEmpty());
+
+                if (left.getOutputSet().containsAll(exprInputSlots)) {
+                    leftFilter.add(expr);
+                } else if (right.getOutputSet().containsAll(exprInputSlots)) {
+                    rightFilter.add(expr);
+                } else if (multiJoin.getOutputSet().containsAll(exprInputSlots)) {
+                    remainingFilter.add(expr);
+                } else {
+                    throw new RuntimeException("invalid expression");
+                }
+            }
+
+            return PlanUtils.filterOrSelf(remainingFilter, new LogicalJoin<>(
+                    multiJoinHandleChildren.getOnlyJoinType().get(),
+                    ExpressionUtils.EMPTY_CONDITION,
+                    multiJoinHandleChildren.getNotInnerJoinConditions(),
+                    PlanUtils.filterOrSelf(leftFilter, left), PlanUtils.filterOrSelf(rightFilter, right)));
+        }
+
+        // following this multiJoin just contain INNER/CROSS.
+        List<Expression> joinFilter = multiJoinHandleChildren.getJoinFilter();
+
+        Plan left = multiJoinHandleChildren.child(0);
+        List<Plan> candidates = multiJoinHandleChildren.children().subList(1, multiJoinHandleChildren.arity());
+
+        LogicalJoin<? extends Plan, ? extends Plan> join = findInnerJoin(left, candidates, joinFilter);
+        List<Plan> newInputs = Lists.newArrayList();
+        newInputs.add(join);
+        newInputs.addAll(candidates.stream().filter(plan -> !join.right().equals(plan)).collect(Collectors.toList()));
+
+        joinFilter.removeAll(join.getHashJoinConjuncts());
+        joinFilter.removeAll(join.getOtherJoinConjuncts());
+        return multiJoinToJoin(new MultiJoin(
+                newInputs,
+                joinFilter,
+                Optional.empty(),
+                ExpressionUtils.EMPTY_CONDITION));
+    }
+
+    /**
+     * Returns whether an input can be merged without changing semantics.
+     *
+     * @param input input into a MultiJoin or (GroupPlan|LogicalFilter)
+     * @param changeLeft generate nullable or left not exist.
+     * @return true if the input can be combined into a parent MultiJoin
+     */
+    private static boolean canCombine(Plan input, boolean changeLeft) {

Review Comment:
   ```suggestion
       private static boolean canCombine(Plan input, boolean outputNullable) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MultiJoin.java:
##########
@@ -17,189 +17,170 @@
 
 package org.apache.doris.nereids.rules.rewrite.logical;
 
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.AbstractLogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-import org.apache.doris.nereids.util.ExpressionUtils;
-import org.apache.doris.nereids.util.JoinUtils;
-import org.apache.doris.nereids.util.PlanUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A MultiJoin represents a join of N inputs (NAry-Join).
  * The regular Join represent strictly binary input (Binary-Join).
+ * <p>
+ * One {@link MultiJoin} just contains one {@link JoinType} of SEMI/ANTI/OUTER Join.
+ * <p>
+ * onlyJoinType is Full OUTER JOIN, children.size() == 2.
+ * leftChild [Full OUTER JOIN] rightChild.
+ * <p>
+ * onlyJoinType is LEFT (OUTER/SEMI/ANTI) JOIN,
+ * children[0, last) [LEFT (OUTER/SEMI/ANTI) JOIN] lastChild.
+ * eg: MJ([LOJ] A, B, C, D) is {A B C} [LOJ] {D}.
+ * <p>
+ * onlyJoinType is RIGHT (OUTER/SEMI/ANTI) JOIN,
+ * firstChild [RIGHT (OUTER/SEMI/ANTI) JOIN] children[1, last].
+ * eg: MJ([ROJ] A, B, C, D) is {A} [ROJ] {B C D}.
  */
-public class MultiJoin extends PlanVisitor<Void, Void> {
+public class MultiJoin extends AbstractLogicalPlan {
     /*
      *        topJoin
      *        /     \            MultiJoin
      *   bottomJoin  C  -->     /    |    \
      *     /    \              A     B     C
      *    A      B
      */
-    public final List<Plan> joinInputs = new ArrayList<>();
-    public final List<Expression> conjunctsForAllHashJoins = new ArrayList<>();
-    private List<Expression> conjunctsKeepInFilter = new ArrayList<>();
-
-    /**
-     * reorderJoinsAccordingToConditions
-     *
-     * @return join or filter
-     */
-    public Optional<Plan> reorderJoinsAccordingToConditions() {
-        if (joinInputs.size() >= 2) {
-            Plan root = reorderJoinsAccordingToConditions(joinInputs, conjunctsForAllHashJoins);
-            return Optional.of(PlanUtils.filterOrSelf(conjunctsKeepInFilter, root));
-        }
-        return Optional.empty();
+
+    // Push predicates into it.
+    // But joinFilter shouldn't contain predicate which just contains one predicate like `T.key > 1`.
+    // Because these predicate should be pushdown.
+    private final List<Expression> joinFilter;
+    // MultiJoin just contains one OUTER/SEMI/ANTI.
+    private final Optional<JoinType> onlyJoinType;
+    // When contains one OUTER/SEMI/ANTI join, keep separately its condition.
+    private final List<Expression> notInnerJoinConditions;
+
+    // private final List<@Nullable List<NamedExpression>> projFields;
+
+    public MultiJoin(List<Plan> inputs, List<Expression> joinFilter, Optional<JoinType> onlyJoinType,
+            List<Expression> notInnerJoinConditions) {
+        super(PlanType.LOGICAL_MULTI_JOIN, inputs.toArray(new Plan[0]));
+        this.joinFilter = joinFilter;
+        this.onlyJoinType = onlyJoinType;
+        this.notInnerJoinConditions = notInnerJoinConditions;

Review Comment:
   Objects.isNotNull?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MultiJoin.java:
##########
@@ -17,189 +17,170 @@
 
 package org.apache.doris.nereids.rules.rewrite.logical;
 
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.AbstractLogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-import org.apache.doris.nereids.util.ExpressionUtils;
-import org.apache.doris.nereids.util.JoinUtils;
-import org.apache.doris.nereids.util.PlanUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A MultiJoin represents a join of N inputs (NAry-Join).
  * The regular Join represent strictly binary input (Binary-Join).
+ * <p>
+ * One {@link MultiJoin} just contains one {@link JoinType} of SEMI/ANTI/OUTER Join.
+ * <p>
+ * onlyJoinType is Full OUTER JOIN, children.size() == 2.

Review Comment:
   ```suggestion
    * onlyJoinType is FULL OUTER JOIN, children.size() == 2.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org