You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/14 05:08:46 UTC

[incubator-doris] branch master updated: [JoinReorder] Implement a better join reorder algorithm. (#6226)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cbc42db  [JoinReorder] Implement a better join reorder algorithm. (#6226)
cbc42db is described below

commit cbc42db010c3e84679835dc08358c21d5e9a8b0b
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Wed Jul 14 13:08:28 2021 +0800

    [JoinReorder] Implement a better join reorder algorithm. (#6226)
    
    The current JoinReorder algorithm mainly sorts according to the star model,
    and only considers the query association relationship between the table and the table.
    The problems are following:
    1. Only applicable to user data whose data model is a star model, data of other models cannot be sorted.
    2. Regardless of the cost of the table, it is impossible to determine the size of the join table relationship,
       and the real query optimization ability is weak.
    3. It is impossible to avoid possible time-consuming joins such as cross joins by sorting.
    
    The new JoinReorder algorithm mainly introduces a new sorting algorithm for Join
    The new ranking algorithm introduces the cost evaluation model to Doris.
    
    The sorting algorithm is mainly based on the following three principles:
    1. The order is: Largest node, Smallest node. . . Second largest node
    2. Cross join is better than Inner join
    3. The right children of Outer join, semi join, and anti join do not move
    
    PlanNode's cost model evaluation mainly relies on two values: cardinality and selectivity.
    cardinality: cardinality, can also be simply understood as the number of rows.
    selectivity: selectivity, a value between 0 and 1. Predicate generally has selectivity.
    The cost model generally calculates the final cardinality of a PlanNode based on the pre-calculated
    cardinality of PlanNode and the selectivity of the predicate to which it belongs.
    
    Currently, you can configure "enable_cost_based_join_reorder" to control the opening and closing of JoinReorder.
    When the configuration is turned on, the new sorting algorithm will take effect, when it is turned off,
    the old sorting algorithm will take effect, and it is turned off by default.
    
    The new sorting algorithm currently has no cost base evaluation for external tables (odbc, es)
    and set calculations (intersect, except). When using these queries, it is not recommended to enable cost base join reorder.
    When using these queries, it is not recommended to enable cost base join reorder.
    
    At the code architecture level:
    1. The new sorting algorithm occurs in the single-node execution planning stage.
    2. Refactored the init and finalize phases of PlanNode to ensure that PlanNode planning
       and cost evaluation have been completed before the sorting algorithm occurs.
---
 fe/fe-core/src/main/cup/sql_parser.cup             |    4 +
 .../java/org/apache/doris/analysis/Analyzer.java   |   50 +-
 .../org/apache/doris/analysis/BinaryPredicate.java |   24 +
 .../org/apache/doris/analysis/DescriptorTable.java |   13 +-
 .../main/java/org/apache/doris/analysis/Expr.java  |   34 +-
 .../java/org/apache/doris/analysis/FromClause.java |   66 +-
 .../java/org/apache/doris/analysis/SelectStmt.java |    5 +-
 .../org/apache/doris/analysis/SlotDescriptor.java  |   12 +
 .../java/org/apache/doris/analysis/Subquery.java   |    2 +-
 .../java/org/apache/doris/analysis/TableRef.java   |    1 +
 .../org/apache/doris/analysis/TupleDescriptor.java |   71 +-
 .../java/org/apache/doris/common/CheckedMath.java  |   54 +
 .../main/java/org/apache/doris/load/ExportJob.java |    6 +-
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |    2 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |    2 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |    2 +-
 .../org/apache/doris/planner/AggregationNode.java  |   47 +-
 .../org/apache/doris/planner/AnalyticEvalNode.java |   23 +-
 .../org/apache/doris/planner/AnalyticPlanner.java  |    7 +-
 .../apache/doris/planner/AssertNumRowsNode.java    |   18 +
 .../org/apache/doris/planner/CrossJoinNode.java    |   48 +-
 .../org/apache/doris/planner/EmptySetNode.java     |   10 +-
 .../java/org/apache/doris/planner/EsScanNode.java  |    1 +
 .../org/apache/doris/planner/ExchangeNode.java     |   19 +-
 .../org/apache/doris/planner/HashJoinNode.java     |  317 ++++-
 .../org/apache/doris/planner/LoadScanNode.java     |    2 +-
 .../org/apache/doris/planner/MergeJoinNode.java    |  170 ---
 .../java/org/apache/doris/planner/MergeNode.java   |   31 +-
 .../org/apache/doris/planner/MysqlScanNode.java    |    7 +-
 .../org/apache/doris/planner/OdbcScanNode.java     |    7 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  116 +-
 .../java/org/apache/doris/planner/PlanNode.java    |  227 +++-
 .../java/org/apache/doris/planner/Planner.java     |   20 +-
 .../java/org/apache/doris/planner/RepeatNode.java  |   17 +-
 .../java/org/apache/doris/planner/ScanNode.java    |    8 +
 .../java/org/apache/doris/planner/SelectNode.java  |   28 +-
 .../org/apache/doris/planner/SetOperationNode.java |   56 +-
 .../apache/doris/planner/SingleNodePlanner.java    |  407 +++++-
 .../java/org/apache/doris/planner/SortNode.java    |   13 +
 .../apache/doris/planner/StreamLoadPlanner.java    |    2 +-
 .../apache/doris/planner/StreamLoadScanNode.java   |    2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |    7 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |    2 +-
 .../org/apache/doris/planner/QueryPlanTest.java    |    2 +
 .../doris/planner/SingleNodePlannerTest.java       | 1358 +++++++++++++++++++-
 fe/fe-core/src/test/resources/log4j2.xml           |    2 +-
 46 files changed, 2846 insertions(+), 476 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index c454336..c8bef94 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2733,6 +2733,10 @@ opt_alter_type ::=
     {:
         RESULT = ShowAlterStmt.AlterType.ROLLUP;
     :}
+    | KW_MATERIALIZED KW_VIEW
+    {:
+        RESULT =  ShowAlterStmt.AlterType.ROLLUP;
+    :}
     | KW_COLUMN
     {:
         RESULT = ShowAlterStmt.AlterType.COLUMN;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index a879f08..deb8413 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -152,7 +152,7 @@ public class Analyzer {
 
     // The runtime filter that is expected to be used
     private final List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
-    
+
     public void setIsSubquery() {
         isSubquery = true;
         globalState.containsSubquery = true;
@@ -206,8 +206,8 @@ public class Analyzer {
         private final Map<TupleId, List<ExprId>> eqJoinConjuncts = Maps.newHashMap();
 
         // set of conjuncts that have been assigned to some PlanNode
-        private final Set<ExprId> assignedConjuncts =
-            Collections.newSetFromMap(new IdentityHashMap<ExprId, Boolean>());
+        private Set<ExprId> assignedConjuncts =
+                Collections.newSetFromMap(new IdentityHashMap<ExprId, Boolean>());
 
         // map from outer-joined tuple id, ie, one that is nullable in this select block,
         // to the last Join clause (represented by its rhs table ref) that outer-joined it
@@ -850,6 +850,15 @@ public class Analyzer {
     }
 
     /**
+     * register expr id
+     *
+     * @param expr
+     */
+    void registerExprId(Expr expr) {
+        expr.setId(globalState.conjunctIdGenerator.getNextId());
+    }
+
+    /**
      * Register individual conjunct with all tuple and slot ids it references
      * and with the global conjunct list.
      */
@@ -945,6 +954,16 @@ public class Analyzer {
         registerConjunct(p);
     }
 
+    public Set<ExprId> getAssignedConjuncts() {
+        return Sets.newHashSet(globalState.assignedConjuncts);
+    }
+
+    public void setAssignedConjuncts(Set<ExprId> assigned) {
+        if (assigned != null) {
+            globalState.assignedConjuncts = Sets.newHashSet(assigned);
+        }
+    }
+
     /**
      * Return all unassigned registered conjuncts that are fully bound by the given
      * (logical) tuple ids, can be evaluated by 'tupleIds' and are not tied to an
@@ -952,7 +971,7 @@ public class Analyzer {
      */
     public List<Expr> getUnassignedConjuncts(List<TupleId> tupleIds) {
         List<Expr> result = Lists.newArrayList();
-        for (Expr e: getUnassignedConjuncts(tupleIds, true)) {
+        for (Expr e : getUnassignedConjuncts(tupleIds, true)) {
             if (canEvalPredicate(tupleIds, e)) result.add(e);
         }
         return result;
@@ -1256,7 +1275,7 @@ public class Analyzer {
                 }
                 final Expr newConjunct = conjunct.getResultValue();
                 if (newConjunct instanceof BoolLiteral) {
-                    final BoolLiteral value = (BoolLiteral)newConjunct;
+                    final BoolLiteral value = (BoolLiteral) newConjunct;
                     if (!value.getValue()) {
                         if (fromHavingClause) {
                             hasEmptyResultSet_ = true;
@@ -1597,6 +1616,17 @@ public class Analyzer {
     public void setChangeResSmap(ExprSubstitutionMap changeResSmap) {
         this.changeResSmap = changeResSmap;
     }
+
+    // Load plan and query plan are the same framework
+    // Some Load method in doris access through http protocol, which will cause the session may be empty.
+    // In order to avoid the occurrence of null pointer exceptions, a check will be added here
+    public boolean safeIsEnableJoinReorderBasedCost() {
+        if (globalState.context == null) {
+            return false;
+        }
+        return globalState.context.getSessionVariable().isEnableJoinReorderBasedCost();
+    }
+
     /**
      * Returns true if predicate 'e' can be correctly evaluated by a tree materializing
      * 'tupleIds', otherwise false:
@@ -1783,7 +1813,7 @@ public class Analyzer {
      * materialization decision be cost-based?
      */
     public void markRefdSlots(Analyzer analyzer, PlanNode planRoot,
-                               List<Expr> outputExprs, AnalyticInfo analyticInfo) {
+                              List<Expr> outputExprs, AnalyticInfo analyticInfo) {
         if (planRoot == null) {
             return;
         }
@@ -1824,7 +1854,7 @@ public class Analyzer {
 
     /**
      * Column conduction, can slot a value-transfer to slot b
-     *
+     * <p>
      * TODO(zxy) Use value-transfer graph to check
      */
     public boolean hasValueTransfer(SlotId a, SlotId b) {
@@ -1834,7 +1864,7 @@ public class Analyzer {
     /**
      * Returns sorted slot IDs with value transfers from 'srcSid'.
      * Time complexity: O(V) where V = number of slots
-     *
+     * <p>
      * TODO(zxy) Use value-transfer graph to check
      */
     public List<SlotId> getValueTransferTargets(SlotId srcSid) {
@@ -1848,8 +1878,8 @@ public class Analyzer {
      * to an outer-joined tuple.
      */
     public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) {
-        for (SlotId srcSid: sids) {
-            for (SlotId dstSid: getValueTransferTargets(srcSid)) {
+        for (SlotId srcSid : sids) {
+            for (SlotId dstSid : getValueTransferTargets(srcSid)) {
                 if (isOuterJoined(getTupleId(dstSid))) return true;
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
index 8c3da5d..038a00e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinaryPredicate.java
@@ -623,6 +623,30 @@ public class BinaryPredicate extends Predicate implements Writable {
     }
 
     @Override
+    public void setSelectivity() {
+        switch(op) {
+            case EQ:
+            case EQ_FOR_NULL: {
+                Reference<SlotRef> slotRefRef = new Reference<SlotRef>();
+                boolean singlePredicate = isSingleColumnPredicate(slotRefRef, null);
+                if (singlePredicate) {
+                    long distinctValues = slotRefRef.getRef().getNumDistinctValues();
+                    if (distinctValues != -1) {
+                        selectivity = 1.0 / distinctValues;
+                    }
+                }
+                break;
+            } default: {
+                // Reference hive
+                selectivity = 1.0 / 3.0;
+                break;
+            }
+        }
+
+        return;
+    }
+
+    @Override
     public int hashCode() {
         return 31 * super.hashCode() + Objects.hashCode(op);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 443dc32..0619a62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -83,7 +83,7 @@ public class DescriptorTable {
         for (SlotDescriptor slot: src.getSlots()) {
             copySlotDescriptor(d, slot);
         }
-        d.computeMemLayout();
+        d.computeStatAndMemLayout();
         return d;
     }
 
@@ -122,14 +122,21 @@ public class DescriptorTable {
         }
     }
 
-    // Computes physical layout parameters of all descriptors.
-    // Call this only after the last descriptor was added.
+    @Deprecated
     public void computeMemLayout() {
         for (TupleDescriptor d : tupleDescs.values()) {
             d.computeMemLayout();
         }
     }
 
+    // Computes physical layout parameters of all descriptors and calculate the statistics of the tuple.
+    // Call this only after the last descriptor was added.
+    public void computeStatAndMemLayout() {
+        for (TupleDescriptor d : tupleDescs.values()) {
+            d.computeStatAndMemLayout();
+        }
+    }
+
     public TDescriptorTable toThrift() {
         TDescriptorTable result = new TDescriptorTable();
         HashSet<Table> referencedTbls = Sets.newHashSet();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 1b3daba..f27a3f1 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -63,7 +63,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     private static final String NEGATE_FN = "negate";
 
     // to be used where we can't come up with a better estimate
-    protected static final double DEFAULT_SELECTIVITY = 0.1;
+    public static final double DEFAULT_SELECTIVITY = 0.1;
 
     public final static float FUNCTION_CALL_COST = 10;
 
@@ -180,6 +180,10 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
                 public boolean apply(Expr arg) { return arg instanceof NullLiteral; }
             };
 
+    public void setSelectivity() {
+        selectivity = -1;
+    }
+
     /* TODO(zc)
     public final static com.google.common.base.Predicate<Expr>
             IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE =
@@ -299,6 +303,8 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         return selectivity;
     }
 
+    public boolean hasSelectivity() { return selectivity >= 0; }
+
     public long getNumDistinctValues() {
         return numDistinctValues;
     }
@@ -374,6 +380,9 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
 
         // Do all the analysis for the expr subclass before marking the Expr analyzed.
         analyzeImpl(analyzer);
+        if (analyzer.safeIsEnableJoinReorderBasedCost()) {
+            setSelectivity();
+        }
         analysisDone();
     }
 
@@ -1420,6 +1429,29 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         return this;
     }
 
+    /**
+     * Returns the descriptor of the scan slot that directly or indirectly produces
+     * the values of 'this' SlotRef. Traverses the source exprs of intermediate slot
+     * descriptors to resolve materialization points (e.g., aggregations).
+     * Returns null if 'e' or any source expr of 'e' is not a SlotRef or cast SlotRef.
+     */
+    public SlotDescriptor findSrcScanSlot() {
+        SlotRef slotRef = unwrapSlotRef(false);
+        if (slotRef == null) {
+            return null;
+        }
+        SlotDescriptor slotDesc = slotRef.getDesc();
+        if (slotDesc.isScanSlot()) {
+            return slotDesc;
+        }
+        if (slotDesc.getSourceExprs().size() == 1) {
+            return slotDesc.getSourceExprs().get(0).findSrcScanSlot();
+        }
+        // No known source expr, or there are several source exprs meaning the slot is
+        // has no single source table.
+        return null;
+    }
+
     public static double getConstFromExpr(Expr e) throws AnalysisException{
         Preconditions.checkState(e.isConstant());
         double value = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java
index b5f1b12..5f09a14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java
@@ -17,11 +17,6 @@
 
 package org.apache.doris.analysis;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
 
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
@@ -31,10 +26,16 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 
-import com.google.common.base.Strings;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
 /**
  * Wraps a list of TableRef instances that form a FROM clause, allowing them to be
  * analyzed independently of the statement using them. To increase the flexibility of
@@ -61,23 +62,6 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
         this.needToSql = needToSql;
     }
 
-    private void sortTableRefForSubquery(Analyzer analyzer) {
-        Collections.sort(this.tableRefs_, new Comparator<TableRef>() {
-            @Override
-            public int compare(TableRef tableref1, TableRef tableref2) {
-                int i1 = 0;
-                int i2 = 0;
-                if (tableref1.getOnClause() != null) {
-                    i1 = 1;
-                }
-                if (tableref2.getOnClause() != null) {
-                    i2 = 1;
-                }
-                return i1 - i2;
-            }
-        });
-    }
-
     private void checkFromHiveTable(Analyzer analyzer) throws AnalysisException {
         for (TableRef tblRef : tableRefs_) {
             if (!(tblRef instanceof BaseTableRef)) {
@@ -111,6 +95,33 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
         }
     }
 
+    /**
+     * In some cases, the reorder method of select stmt will incorrectly sort the tableRef with on clause.
+     * The meaning of this function is to reset those tableRefs with on clauses.
+     * For example:
+     * Origin stmt: select * from t1 inner join t2 on t1.k1=t2.k1
+     * After analyze: select * from t2 on t1.k1=t2.k1 inner join t1
+     *
+     * If this statement just needs to be reanalyze (query rewriter), an error will be reported
+     * because the table t1 in the on clause cannot be recognized.
+     */
+    private void sortTableRefKeepSequenceOfOnClause() {
+        Collections.sort(this.tableRefs_, new Comparator<TableRef>() {
+            @Override
+            public int compare(TableRef tableref1, TableRef tableref2) {
+                int i1 = 0;
+                int i2 = 0;
+                if (tableref1.getOnClause() != null) {
+                    i1 = 1;
+                }
+                if (tableref2.getOnClause() != null) {
+                    i2 = 1;
+                }
+                return i1 - i2;
+            }
+        });
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
         if (analyzed_) return;
@@ -120,7 +131,14 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
             return;
         }
 
-        sortTableRefForSubquery(analyzer);
+        // The order of the tables may have changed during the previous analyzer process.
+        // For example, a join b on xxx is changed to b on xxx join a.
+        // This change will cause the predicate in on clause be adjusted to the front of the association table,
+        // causing semantic analysis to fail. Unknown column 'column1' in 'table1'
+        // So we need to readjust the order of the tables here.
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            sortTableRefKeepSequenceOfOnClause();
+        }
 
         // Start out with table refs to establish aliases.
         TableRef leftTblRef = null;  // the one to the left of tblRef
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index d67ad8b..da37283 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -515,7 +515,10 @@ public class SelectStmt extends QueryStmt {
         if (needToSql) {
             sqlString_ = toSql();
         }
-        reorderTable(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            LOG.debug("use old reorder logical in select stmt");
+            reorderTable(analyzer);
+        }
 
         resolveInlineViewRefs(analyzer);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 17f1d5a..8803bd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -19,6 +19,8 @@ package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.ColumnStats;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.thrift.TSlotDescriptor;
 
@@ -195,6 +197,8 @@ public class SlotDescriptor {
                 stats = new ColumnStats();
             }
         }
+        // FIXME(dhc): mock ndv
+        stats.setNumDistinctValues(parent.getCardinality());
         return stats;
     }
 
@@ -290,4 +294,12 @@ public class SlotDescriptor {
         builder.append(prefix).append("slotIdx=").append(slotIdx).append("\n");
         return builder.toString();
     }
+
+    public boolean isScanSlot() {
+        Table table = parent.getTable();
+        if ((table != null) && (table instanceof OlapTable)) {
+            return true;
+        }
+        return false;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Subquery.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Subquery.java
index e6dffd8..2d8cb97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Subquery.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Subquery.java
@@ -87,7 +87,7 @@ public class Subquery extends Expr {
         try {
             stmt.analyze(analyzer);
         } catch (UserException e) {
-            throw new AnalysisException(e.getMessage());
+            throw new AnalysisException(e.getMessage(), e);
         }
         // Check whether the stmt_ contains an illegal mix of un/correlated table refs.
         stmt.getCorrelatedTupleIds(analyzer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 27dc380..f18d574 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -638,6 +638,7 @@ public class TableRef implements ParseNode, Writable {
         return null;
     }
 
+    public boolean isAnalyzed() { return isAnalyzed; }
     public boolean isResolved() {
         return !getClass().equals(TableRef.class);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
index 0b4553c..9a7b7b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
@@ -62,18 +62,24 @@ public class TupleDescriptor {
     private int numNullBytes;
     private int numNullableSlots;
 
+    // This cardinality is only used to mock slot ndv.
+    // Only tuple of olap scan node has this value.
+    private long cardinality;
+
     private float avgSerializedSize;  // in bytes; includes serialization overhead
 
     public TupleDescriptor(TupleId id) {
         this.id = id;
         this.slots = new ArrayList<SlotDescriptor>();
         this.debugName = "";
+        this.cardinality = -1;
     }
 
     public TupleDescriptor(TupleId id, String debugName) {
         this.id = id;
         this.slots = new ArrayList<SlotDescriptor>();
         this.debugName = debugName;
+        this.cardinality = -1;
     }
 
     public void addSlot(SlotDescriptor desc) {
@@ -97,6 +103,14 @@ public class TupleDescriptor {
         return slots;
     }
 
+    public void setCardinality(long cardinality) {
+        this.cardinality = cardinality;
+    }
+
+    public long getCardinality() {
+        return cardinality;
+    }
+
     public ArrayList<SlotDescriptor> getMaterializedSlots() {
         ArrayList<SlotDescriptor> result = Lists.newArrayList();
         for (SlotDescriptor slot : slots) {
@@ -162,6 +176,57 @@ public class TupleDescriptor {
         return ttupleDesc;
     }
 
+    /**
+     * This function is mainly used to calculate the statistics of the tuple and the layout information.
+     * Generally, it occurs after the plan node materializes the slot and before calculating the plan node statistics.
+     * PlanNode.init() {
+     *     materializedSlot();
+     *     tupleDesc.computeStatAndMemLayout();
+     *     computeStat();
+     * }
+     */
+    public void computeStatAndMemLayout() {
+        computeStat();
+        computeMemLayout();
+    }
+
+    /**
+     * This function is mainly used to evaluate the statistics of the tuple,
+     * such as the average size of each row.
+     * This function will be used before the computeStat() of the plan node
+     * and is the pre-work for evaluating the statistics of the plan node.
+     *
+     * This function is theoretically only called once when the plan node is init.
+     * However, the current code structure is relatively confusing
+     * In order to ensure that even if it is wrongly called a second time, no error will occur,
+     * so it will be initialized again at the beginning of the function.
+     *
+     * In the future this function will be changed to a private function.
+     */
+    @Deprecated
+    public void computeStat() {
+        // init stat
+        avgSerializedSize = 0;
+
+        // compute stat
+        for (SlotDescriptor d : slots) {
+            if (!d.isMaterialized()) {
+               continue;
+            }
+            ColumnStats stats = d.getStats();
+            if (stats.hasAvgSerializedSize()) {
+                avgSerializedSize += d.getStats().getAvgSerializedSize();
+            } else {
+                // TODO: for computed slots, try to come up with stats estimates
+                avgSerializedSize += d.getType().getSlotSize();
+            }
+        }
+    }
+
+    /**
+     * In the future this function will be changed to a private function.
+     */
+    @Deprecated
     public void computeMemLayout() {
         // sort slots by size
         List<List<SlotDescriptor>> slotsBySize = Lists.newArrayListWithCapacity(PrimitiveType.getMaxSlotSize());
@@ -173,12 +238,6 @@ public class TupleDescriptor {
         numNullableSlots = 0;
         for (SlotDescriptor d : slots) {
             ColumnStats stats = d.getStats();
-            if (stats.hasAvgSerializedSize()) {
-                avgSerializedSize += d.getStats().getAvgSerializedSize();
-            } else {
-                // TODO: for computed slots, try to come up with stats estimates
-                avgSerializedSize += d.getType().getSlotSize();
-            }
             if (d.isMaterialized()) {
                 slotsBySize.get(d.getType().getSlotSize()).add(d);
                 if (d.getIsNullable()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CheckedMath.java b/fe/fe-core/src/main/java/org/apache/doris/common/CheckedMath.java
new file mode 100644
index 0000000..2e4f641
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CheckedMath.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import com.google.common.math.LongMath;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CheckedMath {
+
+    private final static Logger LOG = LogManager.getLogger(CheckedMath.class);
+
+    /**
+     * Computes and returns the multiply of two longs. If an overflow occurs,
+     * the maximum Long value is returned (Long.MAX_VALUE).
+     */
+    public static long checkedMultiply(long a, long b) {
+        try {
+            return LongMath.checkedMultiply(a, b);
+        } catch (ArithmeticException e) {
+            LOG.warn("overflow when multiplying longs: " + a + ", " + b);
+            return Long.MAX_VALUE;
+        }
+    }
+
+    /**
+     * Computes and returns the sum of two longs. If an overflow occurs,
+     * the maximum Long value is returned (Long.MAX_VALUE).
+     */
+    public static long checkedAdd(long a, long b) {
+        try {
+            return LongMath.checkedAdd(a, b);
+        } catch (ArithmeticException e) {
+            LOG.warn("overflow when adding longs: " + a + ", " + b);
+            return Long.MAX_VALUE;
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 1946c03..a07286b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -183,7 +183,7 @@ public class ExportJob implements Writable {
         this.finishTimeMs = -1;
         this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, "");
         this.analyzer = new Analyzer(Catalog.getCurrentCatalog(), null);
-        this.desc = new DescriptorTable();
+        this.desc = analyzer.getDescTbl();
         this.exportPath = "";
         this.columnSeparator = "\t";
         this.lineDelimiter = "\n";
@@ -286,7 +286,7 @@ public class ExportJob implements Writable {
                 }
             }
         }
-        desc.computeMemLayout();
+        desc.computeStatAndMemLayout();
     }
 
     private void plan() throws UserException {
@@ -379,7 +379,6 @@ public class ExportJob implements Writable {
                 ((OlapScanNode) scanNode).setColumnFilters(Maps.newHashMap());
                 ((OlapScanNode) scanNode).setIsPreAggregation(false, "This an export operation");
                 ((OlapScanNode) scanNode).setCanTurnOnPreAggr(false);
-                scanNode.init(analyzer);
                 ((OlapScanNode) scanNode).selectBestRollupByRollupSelector(analyzer);
                 break;
             case ODBC:
@@ -392,6 +391,7 @@ public class ExportJob implements Writable {
                 break;
         }
         if (scanNode != null) {
+            scanNode.init(analyzer);
             scanNode.finalize(analyzer);
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index c8bebbe..f9ad624 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -106,7 +106,7 @@ public class LoadLoadingTask extends LoadTask {
 
     @Override
     protected void executeTask() throws Exception{
-        LOG.info("begin to execute loading task. load id: {} job: {}. db: {}, tbl: {}. left retry: {}",
+        LOG.info("begin to execute loading task. load id: {} job id: {}. db: {}, tbl: {}. left retry: {}",
                 DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime);
         retryTime--;
         beginTime = System.nanoTime();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 61f7670..2d8d574 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -127,7 +127,7 @@ public class LoadingTaskPlanner {
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         scanNodes.add(scanNode);
-        descTable.computeMemLayout();
+        descTable.computeStatAndMemLayout();
 
         // 2. Olap table sink
         List<Long> partitionIds = getAllPartitionIds();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index a9ab2d3..30fceb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -950,7 +950,7 @@ public class SparkLoadJob extends BulkLoadJob {
         }
 
         private void initTDescriptorTable(DescriptorTable descTable) {
-            descTable.computeMemLayout();
+            descTable.computeStatAndMemLayout();
             tDescriptorTable = descTable.toThrift();
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 8fcba78..667e34e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -148,8 +148,8 @@ public class AggregationNode extends PlanNode {
         // conjuncts_ = orderConjunctsByCost(conjuncts_);
 
         // Compute the mem layout for both tuples here for simplicity.
-        aggInfo.getOutputTupleDesc().computeMemLayout();
-        aggInfo.getIntermediateTupleDesc().computeMemLayout();
+        aggInfo.getOutputTupleDesc().computeStatAndMemLayout();
+        aggInfo.getIntermediateTupleDesc().computeStatAndMemLayout();
 
         // do this at the end so it can take all conjuncts into account
         computeStats(analyzer);
@@ -167,14 +167,16 @@ public class AggregationNode extends PlanNode {
     @Override
     public void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
         List<Expr> groupingExprs = aggInfo.getGroupingExprs();
         cardinality = 1;
         // cardinality: product of # of distinct values produced by grouping exprs
         for (Expr groupingExpr : groupingExprs) {
             long numDistinct = groupingExpr.getNumDistinctValues();
-            // TODO: remove these before 1.0
             LOG.debug("grouping expr: " + groupingExpr.toSql() + " #distinct=" + Long.toString(
-              numDistinct));
+                    numDistinct));
             if (numDistinct == -1) {
                 cardinality = -1;
                 break;
@@ -190,11 +192,42 @@ public class AggregationNode extends PlanNode {
             // some others, the estimate doesn't overshoot dramatically)
             cardinality *= numDistinct;
         }
+        if (cardinality > 0) {
+            LOG.debug("sel=" + Double.toString(computeSelectivity()));
+            applyConjunctsSelectivity();
+        }
+        // if we ended up with an overflow, the estimate is certain to be wrong
+        if (cardinality < 0) {
+            cardinality = -1;
+        }
+
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats Agg: cardinality={}", cardinality);
+        }
+    }
+
+    @Override
+    protected void computeOldCardinality() {
+        List<Expr> groupingExprs = aggInfo.getGroupingExprs();
+        cardinality = 1;
+        // cardinality: product of # of distinct values produced by grouping exprs
+        for (Expr groupingExpr : groupingExprs) {
+            long numDistinct = groupingExpr.getNumDistinctValues();
+            // TODO: remove these before 1.0
+            LOG.debug("grouping expr: " + groupingExpr.toSql() + " #distinct=" + Long.toString(
+                    numDistinct));
+            if (numDistinct == -1) {
+                cardinality = -1;
+                break;
+            }
+            cardinality *= numDistinct;
+        }
         // take HAVING predicate into account
         LOG.debug("Agg: cardinality=" + Long.toString(cardinality));
         if (cardinality > 0) {
-            cardinality = Math.round((double) cardinality * computeSelectivity());
-            LOG.debug("sel=" + Double.toString(computeSelectivity()));
+            cardinality = Math.round((double) cardinality * computeOldSelectivity());
+            LOG.debug("sel=" + Double.toString(computeOldSelectivity()));
         }
         // if we ended up with an overflow, the estimate is certain to be wrong
         if (cardinality < 0) {
@@ -277,6 +310,8 @@ public class AggregationNode extends PlanNode {
         if (!conjuncts.isEmpty()) {
             output.append(detailPrefix + "having: ").append(getExplainString(conjuncts) + "\n");
         }
+        output.append(detailPrefix).append(String.format(
+                "cardinality=%s", cardinality)).append("\n");
         return output.toString();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index d06e4dc..239a1ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -30,15 +30,15 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TQueryOptions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 
 /**
@@ -108,8 +108,8 @@ public class AnalyticEvalNode extends PlanNode {
 
     @Override
     public void init(Analyzer analyzer) throws UserException {
-        analyzer.getDescTbl().computeMemLayout();
-        intermediateTupleDesc.computeMemLayout();
+        analyzer.getDescTbl().computeStatAndMemLayout();
+        intermediateTupleDesc.computeStatAndMemLayout();
         // we add the analyticInfo's smap to the combined smap of our child
         outputSmap = logicalToPhysicalSmap;
         createDefaultSmap(analyzer);
@@ -138,6 +138,19 @@ public class AnalyticEvalNode extends PlanNode {
     @Override
     protected void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        cardinality = cardinality == -1 ? getChild(0).cardinality : cardinality;
+        applyConjunctsSelectivity();
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats AnalyticEval: cardinality={}", cardinality);
+        }
+    }
+
+    @Override
+    protected void computeOldCardinality() {
         cardinality = getChild(0).cardinality;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticPlanner.java
index 9dbe1a3..42420ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticPlanner.java
@@ -181,7 +181,7 @@ public class AnalyticPlanner {
                         long ndv = Expr.getNumDistinctValues(
                                        Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
 
-                        if (ndv == -1 || ndv < 0 || ndv < numNodes) {
+                        if (ndv == -1 || ndv < 1 || ndv < numNodes) {
                             // didn't get a usable value or the number of partitions is too small
                             continue;
                         }
@@ -228,7 +228,7 @@ public class AnalyticPlanner {
             // TODO: also look at l2 and take the max?
             long ndv = Expr.getNumDistinctValues(l1);
 
-            if (ndv < 0 || ndv < numNodes || ndv < maxNdv) {
+            if (ndv < 1 || ndv < numNodes || ndv < maxNdv) {
                 continue;
             }
 
@@ -673,8 +673,7 @@ public class AnalyticPlanner {
                 logicalToPhysicalSmap.put(new SlotRef(logicalOutputSlot), new SlotRef(physicalOutputSlot));
             }
 
-            physicalOutputTuple.computeMemLayout();
-            //      if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout();
+            physicalOutputTuple.computeStatAndMemLayout();
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
index 86b978d..f8196e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
@@ -17,12 +17,17 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.AssertNumRowsElement;
+import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.TAssertNumRowsNode;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 /**
  * Assert num rows node is used to determine whether the number of rows is less then desired num of rows.
  * The rows are the result of subqueryString.
@@ -30,6 +35,7 @@ import org.apache.doris.thrift.TPlanNodeType;
  * The cancelled reason will be reported by Backend and displayed back to the user.
  */
 public class AssertNumRowsNode extends PlanNode {
+    private static final Logger LOG = LogManager.getLogger(AssertNumRowsNode.class);
 
     private long desiredNumOfRows;
     private String subqueryString;
@@ -47,6 +53,18 @@ public class AssertNumRowsNode extends PlanNode {
     }
 
     @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        super.computeStats(analyzer);
+        if (analyzer.safeIsEnableJoinReorderBasedCost()) {
+            cardinality = 1;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats AssertNumRows: cardinality={}", cardinality);
+        }
+    }
+
+    @Override
     public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
         if (detailLevel == TExplainLevel.BRIEF) {
             return "";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java
index 9246549..31c668f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/CrossJoinNode.java
@@ -19,6 +19,8 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.TableRef;
+import org.apache.doris.common.CheckedMath;
+import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
@@ -60,17 +62,41 @@ public class CrossJoinNode extends PlanNode {
     }
 
     @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        assignedConjuncts = analyzer.getAssignedConjuncts();
+        computeStats(analyzer);
+    }
+
+    @Override
     public void computeStats(Analyzer analyzer) {
-         super.computeStats(analyzer);
-         if (getChild(0).cardinality == -1 || getChild(1).cardinality == -1) {
-           cardinality = -1;
-         } else {
-           cardinality = getChild(0).cardinality * getChild(1).cardinality;
-           if (computeSelectivity() != -1) {
-             cardinality = Math.round(((double) cardinality) * computeSelectivity());
-           }
-         }
-         LOG.debug("stats CrossJoin: cardinality={}", Long.toString(cardinality));
+        super.computeStats(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        if (getChild(0).cardinality == -1 || getChild(1).cardinality == -1) {
+            cardinality = -1;
+        } else {
+            cardinality = CheckedMath.checkedMultiply(getChild(0).cardinality, getChild(1).cardinality);
+            applyConjunctsSelectivity();
+            capCardinalityAtLimit();
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats CrossJoin: cardinality={}", Long.toString(cardinality));
+        }
+    }
+
+    @Override
+    protected void computeOldCardinality() {
+        if (getChild(0).cardinality == -1 || getChild(1).cardinality == -1) {
+            cardinality = -1;
+        } else {
+            cardinality = getChild(0).cardinality * getChild(1).cardinality;
+            if (computeOldSelectivity() != -1) {
+                cardinality = Math.round(((double) cardinality) * computeOldSelectivity());
+            }
+        }
+        LOG.debug("stats CrossJoin: cardinality={}", Long.toString(cardinality));
     }
 
     @Override
@@ -94,6 +120,8 @@ public class CrossJoinNode extends PlanNode {
         } else {
             output.append(detailPrefix + "predicates is NULL.");
         }
+        output.append(detailPrefix).append(String.format(
+                "cardinality=%s", cardinality)).append("\n");
         return output.toString();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
index c9e5ead..eef4399 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
@@ -26,6 +26,9 @@ import org.apache.doris.thrift.TPlanNodeType;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 /**
  * Node that returns an empty result set. Used for planning query blocks with a constant
  * predicate evaluating to false or a limit 0. The result set will have zero rows, but
@@ -33,6 +36,8 @@ import com.google.common.base.Preconditions;
  * construct a valid row empty batch.
  */
 public class EmptySetNode extends PlanNode {
+    private final static Logger LOG = LogManager.getLogger(EmptySetNode.class);
+
     public EmptySetNode(PlanNodeId id, ArrayList<TupleId> tupleIds) {
         super(id, tupleIds, "EMPTYSET");
         Preconditions.checkArgument(tupleIds.size() > 0);
@@ -43,6 +48,9 @@ public class EmptySetNode extends PlanNode {
         avgRowSize = 0;
         cardinality = 0;
         numNodes = 1;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats EmptySet:" + id + ", cardinality: " + cardinality);
+        }
     }
 
     @Override
@@ -53,7 +61,7 @@ public class EmptySetNode extends PlanNode {
         // to be set as materialized (even though it isn't) to avoid failing precondition
         // checks generating the thrift for slot refs that may reference this tuple.
         for (TupleId id: tupleIds) analyzer.getTupleDesc(id).setIsMaterialized(true);
-        computeMemLayout(analyzer);
+        computeTupleStatAndMemLayout(analyzer);
         computeStats(analyzer);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index 6853459..e8c2b71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -82,6 +82,7 @@ public class EsScanNode extends ScanNode {
         super.init(analyzer);
 
         assignBackends();
+        computeStats(analyzer);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index a19aa43..91b23b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -73,11 +73,6 @@ public class ExchangeNode extends PlanNode {
         if (!copyConjuncts) {
             this.conjuncts = Lists.newArrayList();
         }
-        if (hasLimit()) {
-            cardinality = Math.min(limit, inputNode.cardinality);
-        } else {
-            cardinality = inputNode.cardinality;
-        }
         // Only apply the limit at the receiver if there are multiple senders.
         if (inputNode.getFragment().isPartitioned()) limit = inputNode.limit;
         computeTupleIds();
@@ -95,6 +90,20 @@ public class ExchangeNode extends PlanNode {
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
         Preconditions.checkState(conjuncts.isEmpty());
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        computeStats(analyzer);
+    }
+
+    @Override
+    protected void computeStats(Analyzer analyzer) {
+        Preconditions.checkState(children.size() == 1);
+        cardinality = children.get(0).cardinality;
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats Exchange:" + id + ", cardinality: " + cardinality);
+        }
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 6b06de2..83d2ff5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -28,6 +28,10 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.ColumnStats;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.CheckedMath;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.TEqJoinCondition;
 import org.apache.doris.thrift.TExplainLevel;
@@ -35,15 +39,17 @@ import org.apache.doris.thrift.THashJoinNode;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -59,7 +65,7 @@ public class HashJoinNode extends PlanNode {
     // predicates of the form 'a=b' or 'a<=>b'
     private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
     // join conjuncts from the JOIN clause that aren't equi-join predicates
-    private  List<Expr> otherJoinConjuncts;
+    private List<Expr> otherJoinConjuncts;
     private DistributionMode distrMode;
     private boolean isColocate = false; //the flag for colocate join
     private String colocateReason = ""; // if can not do colocate join, set reason here
@@ -134,23 +140,19 @@ public class HashJoinNode extends PlanNode {
 
     @Override
     public void init(Analyzer analyzer) throws UserException {
-        assignConjuncts(analyzer);
-
-        // Set smap to the combined children's smaps and apply that to all conjuncts_.
-        createDefaultSmap(analyzer);
-
+        super.init(analyzer);
+        assignedConjuncts = analyzer.getAssignedConjuncts();
         // outSmap replace in outer join may cause NULL be replace by literal
         // so need replace the outsmap in nullableTupleID
         replaceOutputSmapForOuterJoin();
-
         computeStats(analyzer);
-        //assignedConjuncts = analyzr.getAssignedConjuncts();
 
         ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap();
         List<Expr> newEqJoinConjuncts =
                 Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false);
         eqJoinConjuncts = newEqJoinConjuncts.stream()
                 .map(entity -> (BinaryPredicate) entity).collect(Collectors.toList());
+        assignedConjuncts = analyzer.getAssignedConjuncts();
         otherJoinConjuncts =
                 Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false);
     }
@@ -179,10 +181,191 @@ public class HashJoinNode extends PlanNode {
         }
     }
 
+    /**
+     * Holds the source scan slots of a <SlotRef> = <SlotRef> join predicate.
+     * The underlying table and column on both sides have stats.
+     */
+    public static final class EqJoinConjunctScanSlots {
+        private final Expr eqJoinConjunct;
+        private final SlotDescriptor lhs;
+        private final SlotDescriptor rhs;
+
+        private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs,
+                                        SlotDescriptor rhs) {
+            this.eqJoinConjunct = eqJoinConjunct;
+            this.lhs = lhs;
+            this.rhs = rhs;
+        }
+
+        // Convenience functions. They return double to avoid excessive casts in callers.
+        public double lhsNdv() {
+            // return the estimated number of rows in this partition (-1 if unknown)
+            return Math.min(lhs.getStats().getNumDistinctValues(), lhsNumRows());
+        }
+
+        public double rhsNdv() {
+            return Math.min(rhs.getStats().getNumDistinctValues(), rhsNumRows());
+        }
+
+        public double lhsNumRows() {
+            Table table = lhs.getParent().getTable();
+            Preconditions.checkState(table instanceof OlapTable);
+            return ((OlapTable) (table)).getRowCount();
+        }
+
+        public double rhsNumRows() {
+            Table table = rhs.getParent().getTable();
+            Preconditions.checkState(table instanceof OlapTable);
+            return ((OlapTable) (table)).getRowCount();
+        }
+
+        public TupleId lhsTid() {
+            return lhs.getParent().getId();
+        }
+
+        public TupleId rhsTid() {
+            return rhs.getParent().getId();
+        }
+
+        /**
+         * Returns a new EqJoinConjunctScanSlots for the given equi-join conjunct or null if
+         * the given conjunct is not of the form <SlotRef> = <SlotRef> or if the underlying
+         * table/column of at least one side is missing stats.
+         */
+        public static EqJoinConjunctScanSlots create(Expr eqJoinConjunct) {
+            if (!Expr.IS_EQ_BINARY_PREDICATE.apply(eqJoinConjunct)) return null;
+            SlotDescriptor lhsScanSlot = eqJoinConjunct.getChild(0).findSrcScanSlot();
+            if (lhsScanSlot == null || !hasNumRowsAndNdvStats(lhsScanSlot)) return null;
+            SlotDescriptor rhsScanSlot = eqJoinConjunct.getChild(1).findSrcScanSlot();
+            if (rhsScanSlot == null || !hasNumRowsAndNdvStats(rhsScanSlot)) return null;
+            return new EqJoinConjunctScanSlots(eqJoinConjunct, lhsScanSlot, rhsScanSlot);
+        }
+
+        private static boolean hasNumRowsAndNdvStats(SlotDescriptor slotDesc) {
+            if (slotDesc.getColumn() == null) return false;
+            if (!slotDesc.getStats().hasNumDistinctValues()) return false;
+            return true;
+        }
+
+        /**
+         * Groups the given EqJoinConjunctScanSlots by the lhs/rhs tuple combination
+         * and returns the result as a map.
+         */
+        public static Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>>
+        groupByJoinedTupleIds(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots) {
+            Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids =
+                    new LinkedHashMap<>();
+            for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) {
+                Pair<TupleId, TupleId> tids = Pair.create(slots.lhsTid(), slots.rhsTid());
+                List<EqJoinConjunctScanSlots> scanSlots = scanSlotsByJoinedTids.get(tids);
+                if (scanSlots == null) {
+                    scanSlots = new ArrayList<>();
+                    scanSlotsByJoinedTids.put(tids, scanSlots);
+                }
+                scanSlots.add(slots);
+            }
+            return scanSlotsByJoinedTids;
+        }
+
+        @Override
+        public String toString() {
+            return eqJoinConjunct.toSql();
+        }
+    }
+
+    private long getJoinCardinality() {
+        Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin());
+
+        long lhsCard = getChild(0).cardinality;
+        long rhsCard = getChild(1).cardinality;
+        if (lhsCard == -1 || rhsCard == -1) {
+            return lhsCard;
+        }
+
+        // Collect join conjuncts that are eligible to participate in cardinality estimation.
+        List<EqJoinConjunctScanSlots> eqJoinConjunctSlots = new ArrayList<>();
+        for (Expr eqJoinConjunct : eqJoinConjuncts) {
+            EqJoinConjunctScanSlots slots = EqJoinConjunctScanSlots.create(eqJoinConjunct);
+            if (slots != null) eqJoinConjunctSlots.add(slots);
+        }
+
+        if (eqJoinConjunctSlots.isEmpty()) {
+            // There are no eligible equi-join conjuncts.
+            return lhsCard;
+        }
+
+        return getGenericJoinCardinality(eqJoinConjunctSlots, lhsCard, rhsCard);
+    }
+
+    /**
+     * Returns the estimated join cardinality of a generic N:M inner or outer join based
+     * on the given list of equi-join conjunct slots and the join input cardinalities.
+     * The returned result is >= 0.
+     * The list of join conjuncts must be non-empty and the cardinalities must be >= 0.
+     * <p>
+     * Generic estimation:
+     * cardinality = |child(0)| * |child(1)| / max(NDV(L.c), NDV(R.d))
+     * - case A: NDV(L.c) <= NDV(R.d)
+     * every row from child(0) joins with |child(1)| / NDV(R.d) rows
+     * - case B: NDV(L.c) > NDV(R.d)
+     * every row from child(1) joins with |child(0)| / NDV(L.c) rows
+     * - we adjust the NDVs from both sides to account for predicates that may
+     * might have reduce the cardinality and NDVs
+     */
+    private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard, long rhsCard) {
+        Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin());
+        Preconditions.checkState(!eqJoinConjunctSlots.isEmpty());
+        Preconditions.checkState(lhsCard >= 0 && rhsCard >= 0);
+
+        long result = -1;
+        for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) {
+            // Adjust the NDVs on both sides to account for predicates. Intuitively, the NDVs
+            // should only decrease. We ignore adjustments that would lead to an increase.
+            double lhsAdjNdv = slots.lhsNdv();
+            if (slots.lhsNumRows() > lhsCard) {
+                lhsAdjNdv *= lhsCard / slots.lhsNumRows();
+            }
+            double rhsAdjNdv = slots.rhsNdv();
+            if (slots.rhsNumRows() > rhsCard) {
+                rhsAdjNdv *= rhsCard / slots.rhsNumRows();
+            }
+            // A lower limit of 1 on the max Adjusted Ndv ensures we don't estimate
+            // cardinality more than the max possible.
+            long joinCard = CheckedMath.checkedMultiply(
+                    Math.round((lhsCard / Math.max(1, Math.max(lhsAdjNdv, rhsAdjNdv)))), rhsCard);
+            if (result == -1) {
+                result = joinCard;
+            } else {
+                result = Math.min(result, joinCard);
+            }
+        }
+        Preconditions.checkState(result >= 0);
+        return result;
+    }
+
+
     @Override
     public void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
 
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        if (joinOp.isSemiAntiJoin()) {
+            cardinality = getSemiJoinCardinality();
+        } else if (joinOp.isInnerJoin() || joinOp.isOuterJoin()) {
+            cardinality = getJoinCardinality();
+        } else {
+            Preconditions.checkState(false, "joinOp is not supported");
+        }
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats HashJoin:" + id + ", cardinality: " + cardinality);
+        }
+    }
+
+    @Override
+    protected void computeOldCardinality() {
         // For a join between child(0) and child(1), we look for join conditions "L.c = R.d"
         // (with L being from child(0) and R from child(1)) and use as the cardinality
         // estimate the maximum of
@@ -228,11 +411,11 @@ public class HashJoinNode extends PlanNode {
             // TODO rownum
             //Table rhsTbl = slotDesc.getParent().getTableFamilyGroup().getBaseTable();
             // if (rhsTbl != null && rhsTbl.getNumRows() != -1) {
-                // we can't have more distinct values than rows in the table, even though
-                // the metastore stats may think so
-                // LOG.info(
-                //   "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows()));
-                // numDistinct = Math.min(numDistinct, rhsTbl.getNumRows());
+            // we can't have more distinct values than rows in the table, even though
+            // the metastore stats may think so
+            // LOG.info(
+            //   "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows()));
+            // numDistinct = Math.min(numDistinct, rhsTbl.getNumRows());
             // }
             maxNumDistinct = Math.max(maxNumDistinct, numDistinct);
             LOG.debug("min slotref: {}, #distinct: {}", rhsSlotRef.toSql(), numDistinct);
@@ -245,12 +428,112 @@ public class HashJoinNode extends PlanNode {
             cardinality = getChild(0).cardinality;
         } else {
             cardinality = Math.round((double) getChild(0).cardinality * (double) getChild(
-                1).cardinality / (double) maxNumDistinct);
+                    1).cardinality / (double) maxNumDistinct);
             LOG.debug("lhs card: {}, rhs card: {}", getChild(0).cardinality, getChild(1).cardinality);
         }
         LOG.debug("stats HashJoin: cardinality {}", cardinality);
     }
 
+    /**
+     * Unwraps the SlotRef in expr and returns the NDVs of it.
+     * Returns -1 if the NDVs are unknown or if expr is not a SlotRef.
+     */
+    private long getNdv(Expr expr) {
+        SlotRef slotRef = expr.unwrapSlotRef(false);
+        if (slotRef == null) {
+            return -1;
+        }
+        SlotDescriptor slotDesc = slotRef.getDesc();
+        if (slotDesc == null) {
+            return -1;
+        }
+        ColumnStats stats = slotDesc.getStats();
+        if (!stats.hasNumDistinctValues()) {
+            return -1;
+        }
+        return stats.getNumDistinctValues();
+    }
+
+    /**
+     * Returns the estimated cardinality of a semi join node.
+     * For a left semi join between child(0) and child(1), we look for equality join
+     * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and use as
+     * the cardinality estimate the minimum of
+     * |child(0)| * Min(NDV(L.c), NDV(R.d)) / NDV(L.c)
+     * over all suitable join conditions. The reasoning is that:
+     * - each row in child(0) is returned at most once
+     * - the probability of a row in child(0) having a match in R is
+     * Min(NDV(L.c), NDV(R.d)) / NDV(L.c)
+     * <p>
+     * For a left anti join we estimate the cardinality as the minimum of:
+     * |L| * Max(NDV(L.c) - NDV(R.d), NDV(L.c)) / NDV(L.c)
+     * over all suitable join conditions. The reasoning is that:
+     * - each row in child(0) is returned at most once
+     * - if NDV(L.c) > NDV(R.d) then the probability of row in L having a match
+     * in child(1) is (NDV(L.c) - NDV(R.d)) / NDV(L.c)
+     * - otherwise, we conservatively use |L| to avoid underestimation
+     * <p>
+     * We analogously estimate the cardinality for right semi/anti joins, and treat the
+     * null-aware anti join like a regular anti join
+     */
+    private long getSemiJoinCardinality() {
+        Preconditions.checkState(joinOp.isSemiJoin());
+
+        // Return -1 if the cardinality of the returned side is unknown.
+        long cardinality;
+        if (joinOp == JoinOperator.RIGHT_SEMI_JOIN
+                || joinOp == JoinOperator.RIGHT_ANTI_JOIN) {
+            if (getChild(1).cardinality == -1) {
+                return -1;
+            }
+            cardinality = getChild(1).cardinality;
+        } else {
+            if (getChild(0).cardinality == -1) {
+                return -1;
+            }
+            cardinality = getChild(0).cardinality;
+        }
+        double minSelectivity = 1.0;
+        for (Expr eqJoinPredicate : eqJoinConjuncts) {
+            long lhsNdv = getNdv(eqJoinPredicate.getChild(0));
+            lhsNdv = Math.min(lhsNdv, getChild(0).cardinality);
+            long rhsNdv = getNdv(eqJoinPredicate.getChild(1));
+            rhsNdv = Math.min(rhsNdv, getChild(1).cardinality);
+
+            // Skip conjuncts with unknown NDV on either side.
+            if (lhsNdv == -1 || rhsNdv == -1) {
+                continue;
+            }
+
+            double selectivity = 1.0;
+            switch (joinOp) {
+                case LEFT_SEMI_JOIN: {
+                    selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (lhsNdv);
+                    break;
+                }
+                case RIGHT_SEMI_JOIN: {
+                    selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (rhsNdv);
+                    break;
+                }
+                case LEFT_ANTI_JOIN:
+                case NULL_AWARE_LEFT_ANTI_JOIN: {
+                    selectivity = (double) (lhsNdv > rhsNdv ? (lhsNdv - rhsNdv) : lhsNdv) / (double) lhsNdv;
+                    break;
+                }
+                case RIGHT_ANTI_JOIN: {
+                    selectivity = (double) (rhsNdv > lhsNdv ? (rhsNdv - lhsNdv) : rhsNdv) / (double) rhsNdv;
+                    break;
+                }
+                default:
+                    Preconditions.checkState(false);
+            }
+            minSelectivity = Math.min(minSelectivity, selectivity);
+        }
+
+        Preconditions.checkState(cardinality != -1);
+        return Math.round(cardinality * minSelectivity);
+    }
+
     @Override
     protected String debugString() {
         return MoreObjects.toStringHelper(this).add("eqJoinConjuncts",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
index 2029891..de896f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -186,7 +186,7 @@ public abstract class LoadScanNode extends ScanNode {
         // LOG.info("brokerScanRange is {}", brokerScanRange);
 
         // Need re compute memory layout after set some slot descriptor to nullable
-        srcTupleDesc.computeMemLayout();
+        srcTupleDesc.computeStatAndMemLayout();
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MergeJoinNode.java
deleted file mode 100644
index a775874..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeJoinNode.java
+++ /dev/null
@@ -1,170 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SlotId;
-import org.apache.doris.common.Pair;
-import org.apache.doris.thrift.TEqJoinCondition;
-import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TMergeJoinNode;
-import org.apache.doris.thrift.TPlanNode;
-import org.apache.doris.thrift.TPlanNodeType;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-
-/**
- * Merge join between left child and right child.
- * The right child must be a leaf node, ie, can only materialize
- * a single input tuple.
- */
-public class MergeJoinNode extends PlanNode {
-    private final static Logger LOG = LogManager.getLogger(MergeJoinNode.class);
-    // conjuncts of the form "<lhs> = <rhs>", recorded as Pair(<lhs>, <rhs>)
-    private final List<Pair<Expr, Expr>> cmpConjuncts;
-    // join conjuncts from the JOIN clause that aren't equi-join predicates
-    private final List<Expr> otherJoinConjuncts;
-    private DistributionMode distrMode;
-
-    public MergeJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner,
-      List<Pair<Expr, Expr>> cmpConjuncts, List<Expr> otherJoinConjuncts) {
-        super(id, "MERGE JOIN");
-        Preconditions.checkArgument(cmpConjuncts != null);
-        Preconditions.checkArgument(otherJoinConjuncts != null);
-        tupleIds.addAll(outer.getTupleIds());
-        tupleIds.addAll(inner.getTupleIds());
-        this.distrMode = DistributionMode.PARTITIONED;
-        this.cmpConjuncts = cmpConjuncts;
-        this.otherJoinConjuncts = otherJoinConjuncts;
-        children.add(outer);
-        children.add(inner);
-
-        // Inherits all the nullable tuple from the children
-        // Mark tuples that form the "nullable" side of the outer join as nullable.
-        nullableTupleIds.addAll(inner.getNullableTupleIds());
-        nullableTupleIds.addAll(outer.getNullableTupleIds());
-        nullableTupleIds.addAll(outer.getTupleIds());
-        nullableTupleIds.addAll(inner.getTupleIds());
-    }
-
-    public List<Pair<Expr, Expr>> getCmpConjuncts() {
-        return cmpConjuncts;
-    }
-
-    public DistributionMode getDistributionMode() {
-        return distrMode;
-    }
-
-    @Override
-    public void computeStats(Analyzer analyzer) {
-        super.computeStats(analyzer);
-    }
-
-    @Override
-    protected String debugString() {
-        return MoreObjects.toStringHelper(this).add("cmpConjuncts", cmpConjunctsDebugString()).addValue(
-          super.debugString()).toString();
-    }
-
-    private String cmpConjunctsDebugString() {
-        MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
-        for (Pair<Expr, Expr> entry : cmpConjuncts) {
-            helper.add("lhs", entry.first).add("rhs", entry.second);
-        }
-        return helper.toString();
-    }
-
-    @Override
-    public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) {
-        super.getMaterializedIds(analyzer, ids);
-        // we also need to materialize everything referenced by cmpConjuncts
-        // and otherJoinConjuncts
-        for (Pair<Expr, Expr> p : cmpConjuncts) {
-            p.first.getIds(null, ids);
-            p.second.getIds(null, ids);
-        }
-        for (Expr e : otherJoinConjuncts) {
-            e.getIds(null, ids);
-        }
-    }
-
-    @Override
-    protected void toThrift(TPlanNode msg) {
-        msg.node_type = TPlanNodeType.MERGE_JOIN_NODE;
-        msg.merge_join_node = new TMergeJoinNode();
-        for (Pair<Expr, Expr> entry : cmpConjuncts) {
-            TEqJoinCondition eqJoinCondition =
-              new TEqJoinCondition(entry.first.treeToThrift(), entry.second.treeToThrift());
-            msg.merge_join_node.addToCmpConjuncts(eqJoinCondition);
-        }
-        for (Expr e : otherJoinConjuncts) {
-            msg.hash_join_node.addToOtherJoinConjuncts(e.treeToThrift());
-        }
-    }
-
-    @Override
-    public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
-        String distrModeStr =
-          (distrMode != DistributionMode.NONE) ? (" (" + distrMode.toString() + ")") : "";
-        StringBuilder output = new StringBuilder().append(
-          detailPrefix + "join op: MERGE JOIN" + distrModeStr + "\n").append(
-          detailPrefix + "hash predicates:\n");
-        for (Pair<Expr, Expr> entry : cmpConjuncts) {
-            output.append(detailPrefix + "  " +
-              entry.first.toSql() + " = " + entry.second.toSql() + "\n");
-        }
-        if (!otherJoinConjuncts.isEmpty()) {
-            output.append(detailPrefix + "other join predicates: ").append(
-              getExplainString(otherJoinConjuncts) + "\n");
-        }
-        if (!conjuncts.isEmpty()) {
-            output.append(detailPrefix + "other predicates: ").append(
-              getExplainString(conjuncts) + "\n");
-        }
-        return output.toString();
-    }
-
-    @Override
-    public int getNumInstances() {
-        return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances());
-    }
-
-    enum DistributionMode {
-        NONE("NONE"),
-        BROADCAST("BROADCAST"),
-        PARTITIONED("PARTITIONED");
-
-        private final String description;
-
-        private DistributionMode(String descr) {
-            this.description = descr;
-        }
-
-        @Override
-        public String toString() {
-            return description;
-        }
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java
index 629cfa9..bb30702 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MergeNode.java
@@ -24,7 +24,6 @@ import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.common.UserException;
-
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TMergeNode;
@@ -62,19 +61,9 @@ public class MergeNode extends PlanNode {
 
     protected final TupleId tupleId;
 
-    private final boolean isIntermediateMerge;
-
-    protected MergeNode(PlanNodeId id, TupleId tupleId) {
-        super(id, tupleId.asList(), "MERGE");
-       // this.rowTupleIds.add(tupleId);
-        this.tupleId = tupleId;
-        this.isIntermediateMerge = false;
-    }
-
     protected MergeNode(PlanNodeId id, MergeNode node) {
         super(id, node, "MERGE");
         this.tupleId = node.tupleId;
-        this.isIntermediateMerge = true;
     }
 
     public void addConstExprList(List<Expr> exprs) {
@@ -137,6 +126,26 @@ public class MergeNode extends PlanNode {
     @Override
     public void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        cardinality = constExprLists.size();
+        for (PlanNode child : children) {
+            // ignore missing child cardinality info in the hope it won't matter enough
+            // to change the planning outcome
+            if (child.cardinality > 0) {
+                cardinality += child.cardinality;
+            }
+        }
+        applyConjunctsSelectivity();
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats Merge: cardinality={}", Long.toString(cardinality));
+        }
+    }
+
+    @Override
+    protected void computeOldCardinality() {
         cardinality = constExprLists.size();
         for (PlanNode child : children) {
             // ignore missing child cardinality info in the hope it won't matter enough
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
index 945542b..fcc2212 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
@@ -61,6 +61,12 @@ public class MysqlScanNode extends ScanNode {
     }
 
     @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        computeStats(analyzer);
+    }
+
+    @Override
     protected String debugString() {
         MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
         return helper.addValue(super.debugString()).toString();
@@ -71,7 +77,6 @@ public class MysqlScanNode extends ScanNode {
         // Convert predicates to MySQL columns and filters.
         createMySQLColumns(analyzer);
         createMySQLFilters(analyzer);
-        computeStats(analyzer);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java
index 918a567..90f989d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java
@@ -80,6 +80,12 @@ public class OdbcScanNode extends ScanNode {
     }
 
     @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        computeStats(analyzer);
+    }
+
+    @Override
     protected String debugString() {
         MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
         return helper.addValue(super.debugString()).toString();
@@ -90,7 +96,6 @@ public class OdbcScanNode extends ScanNode {
         // Convert predicates to Odbc columns and filters.
         createOdbcColumns(analyzer);
         createOdbcFilters(analyzer);
-        computeStats(analyzer);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 99e158e..a69c08d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -132,13 +132,12 @@ public class OlapScanNode extends ScanNode {
 
     // List of tablets will be scanned by current olap_scan_node
     private ArrayList<Long> scanTabletIds = Lists.newArrayList();
-    private boolean isFinalized = false;
 
     private HashSet<Long> scanBackendIds = new HashSet<>();
 
     private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
     // a bucket seq may map to many tablets, and each tablet has a TScanRangeLocations.
-    public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations= ArrayListMultimap.create();
+    public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations = ArrayListMultimap.create();
 
     // Constructs node to scan given data files of table 'tbl'.
     public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
@@ -228,8 +227,8 @@ public class OlapScanNode extends ScanNode {
             SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
             if (sessionVariable.getTestMaterializedView()) {
                 throw new AnalysisException("The old scan range info is different from the new one when "
-                                                    + "test_materialized_view is true. "
-                                                    + scanRangeInfo);
+                        + "test_materialized_view is true. "
+                        + scanRangeInfo);
             }
             situation = "The key type of table is aggregated.";
             update = false;
@@ -297,38 +296,93 @@ public class OlapScanNode extends ScanNode {
 
         filterDeletedRows(analyzer);
         computePartitionInfo();
+        computeTupleState(analyzer);
+
+        /**
+         * Compute InAccurate stats before mv selector and tablet pruning.
+         * - Accurate statistical information relies on the selector of materialized views and bucket reduction.
+         * - However, Those both processes occur after the reorder algorithm is completed.
+         * - When Join reorder is turned on, the computeStats() must be completed before the reorder algorithm.
+         * - So only an inaccurate statistical information can be calculated here.
+         */
+        if (analyzer.safeIsEnableJoinReorderBasedCost()) {
+            computeInaccurateStats(analyzer);
+        }
     }
 
     @Override
     public void finalize(Analyzer analyzer) throws UserException {
-        if (isFinalized) {
-            return;
+        LOG.debug("OlapScanNode get scan range locations. Tuple: {}", desc);
+        /**
+         * If JoinReorder is turned on, it will be calculated init(), and this value is not accurate.
+         * In the following logic, cardinality will be accurately calculated again.
+         * So here we need to reset the value of cardinality.
+         */
+        if (analyzer.safeIsEnableJoinReorderBasedCost()) {
+            cardinality = 0;
         }
-
-        LOG.debug("OlapScanNode finalize. Tuple: {}", desc);
         try {
             getScanRangeLocations();
         } catch (AnalysisException e) {
             throw new UserException(e.getMessage());
         }
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            computeOldRowSizeAndCardinality();
+        }
+        computeNumNodes();
+    }
 
-        computeStats(analyzer);
-        isFinalized = true;
+    public void computeTupleState(Analyzer analyzer) {
+        for (TupleId id : tupleIds) {
+            analyzer.getDescTbl().getTupleDesc(id).computeStat();
+        }
     }
 
-    @Override
-    public void computeStats(Analyzer analyzer) {
+    public void computeOldRowSizeAndCardinality() {
         if (cardinality > 0) {
             avgRowSize = totalBytes / (float) cardinality;
-            if (hasLimit()) {
-                cardinality = Math.min(cardinality, limit);
-            }
+            capCardinalityAtLimit();
+        }
+        // when node scan has no data, cardinality should be 0 instead of a invalid value after computeStats()
+        cardinality = cardinality == -1 ? 0 : cardinality;
+    }
+
+    @Override
+    protected void computeNumNodes() {
+        if (cardinality > 0) {
             numNodes = scanBackendIds.size();
         }
         // even current node scan has no data,at least on backend will be assigned when the fragment actually execute
         numNodes = numNodes <= 0 ? 1 : numNodes;
-        // when node scan has no data, cardinality should be 0 instead of a invalid value after computeStats()
-        cardinality = cardinality == -1 ? 0 : cardinality;
+    }
+
+    /**
+     * Calculate inaccurate stats such as: cardinality.
+     * cardinality: the value of cardinality is the sum of rowcount which belongs to selectedPartitionIds
+     * The cardinality here is actually inaccurate, it will be greater than the actual value.
+     * There are two reasons
+     * 1. During the actual execution, not all tablets belonging to the selected partition will be scanned.
+     * Some tablets may have been pruned before execution.
+     * 2. The base index may eventually be replaced by mv index.
+     * <p>
+     * There are three steps to calculate cardinality
+     * 1. Calculate how many rows were scanned
+     * 2. Apply conjunct
+     * 3. Apply limit
+     *
+     * @param analyzer
+     */
+    private void computeInaccurateStats(Analyzer analyzer) {
+        super.computeStats(analyzer);
+        // step1: Calculate how many rows were scanned
+        cardinality = 0;
+        for (long selectedPartitionId : selectedPartitionIds) {
+            final Partition partition = olapTable.getPartition(selectedPartitionId);
+            final MaterializedIndex baseIndex = partition.getBaseIndex();
+            cardinality += baseIndex.getRowCount();
+        }
+        applyConjunctsSelectivity();
+        capCardinalityAtLimit();
     }
 
     private Collection<Long> partitionPrune(PartitionInfo partitionInfo, PartitionNames partitionNames) throws AnalysisException {
@@ -360,13 +414,13 @@ public class OlapScanNode extends ScanNode {
             MaterializedIndex table,
             DistributionInfo distributionInfo) throws AnalysisException {
         DistributionPruner distributionPruner = null;
-        switch(distributionInfo.getType()) {
+        switch (distributionInfo.getType()) {
             case HASH: {
                 HashDistributionInfo info = (HashDistributionInfo) distributionInfo;
                 distributionPruner = new HashDistributionPruner(table.getTabletIdsInOrder(),
-                                                                info.getDistributionColumns(),
-                                                                columnFilters,
-                                                                info.getBucketNum());
+                        info.getDistributionColumns(),
+                        columnFilters,
+                        info.getBucketNum());
                 return distributionPruner.prune();
             }
             case RANDOM: {
@@ -409,7 +463,7 @@ public class OlapScanNode extends ScanNode {
                     visibleVersion, visibleVersionHash, localBeId, schemaHash);
             if (allQueryableReplicas.isEmpty()) {
                 LOG.error("no queryable replica found in tablet {}. visible version {}-{}",
-                         tabletId, visibleVersion, visibleVersionHash);
+                        tabletId, visibleVersion, visibleVersionHash);
                 if (LOG.isDebugEnabled()) {
                     for (Replica replica : tablet.getReplicas()) {
                         LOG.debug("tablet {}, replica: {}", tabletId, replica.toString());
@@ -462,6 +516,12 @@ public class OlapScanNode extends ScanNode {
 
             result.add(scanRangeLocations);
         }
+        // FIXME(dhc): we use cardinality here to simulate ndv
+        if (tablets.size() == 0) {
+            desc.setCardinality(0);
+        } else {
+            desc.setCardinality(cardinality);
+        }
     }
 
     private void computePartitionInfo() throws AnalysisException {
@@ -512,6 +572,7 @@ public class OlapScanNode extends ScanNode {
 
     private void getScanRangeLocations() throws UserException {
         if (selectedPartitionIds.size() == 0) {
+            desc.setCardinality(0);
             return;
         }
         Preconditions.checkState(selectedIndexId != -1);
@@ -597,9 +658,9 @@ public class OlapScanNode extends ScanNode {
         }
 
         output.append(prefix).append(String.format(
-                    "partitions=%s/%s",
-                    selectedPartitionNum,
-                    olapTable.getPartitions().size()));
+                "partitions=%s/%s",
+                selectedPartitionNum,
+                olapTable.getPartitions().size()));
 
         String indexName = olapTable.getIndexNameById(selectedIndexId);
         output.append("\n").append(prefix).append(String.format("rollup: %s", indexName));
@@ -607,7 +668,7 @@ public class OlapScanNode extends ScanNode {
         output.append("\n");
 
         output.append(prefix).append(String.format(
-                    "tabletRatio=%s/%s", selectedTabletsNum, totalTabletsNum));
+                "tabletRatio=%s/%s", selectedTabletsNum, totalTabletsNum));
         output.append("\n");
 
         // We print up to 10 tablet, and we print "..." if the number is more than 10
@@ -673,7 +734,6 @@ public class OlapScanNode extends ScanNode {
         olapScanNode.selectedTabletsNum = 1;
         olapScanNode.totalTabletsNum = 1;
         olapScanNode.setIsPreAggregation(false, "Export job");
-        olapScanNode.isFinalized = true;
         olapScanNode.result.addAll(locationsList);
 
         return olapScanNode;
@@ -715,7 +775,7 @@ public class OlapScanNode extends ScanNode {
         }
     }
 
-    public TupleId getTupleId(){
+    public TupleId getTupleId() {
         Preconditions.checkNotNull(desc);
         return desc.getId();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 1233f45..dc53c44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -20,6 +20,7 @@ package org.apache.doris.planner;
 import com.google.common.base.Joiner;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprId;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TupleDescriptor;
@@ -35,13 +36,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.math.LongMath;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -65,9 +66,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
 
     protected String planNodeName;
 
-    protected PlanNodeId     id;  // unique w/in plan tree; assigned by planner
+    protected PlanNodeId id;  // unique w/in plan tree; assigned by planner
     protected PlanFragmentId fragmentId;  // assigned by planner after fragmentation step
-    protected long           limit; // max. # of rows to be returned; 0: no limit
+    protected long limit; // max. # of rows to be returned; 0: no limit
 
     // ids materialized by the tree rooted at this node
     protected ArrayList<TupleId> tupleIds;
@@ -115,13 +116,11 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     protected boolean compactData;
     protected int numInstances;
 
-    public String getPlanNodeName() {
-        return planNodeName;
-    }
-
     // Runtime filters assigned to this node.
     protected List<RuntimeFilter> runtimeFilters = new ArrayList<>();
 
+    private boolean cardinalityIsDone = false;
+
     protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName) {
         this.id = id;
         this.limit = -1;
@@ -159,6 +158,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         this.numInstances = 1;
     }
 
+    public String getPlanNodeName() {
+        return planNodeName;
+    }
+
     /**
      * Sets tblRefIds_, tupleIds_, and nullableTupleIds_.
      * The default implementation is a no-op.
@@ -197,8 +200,14 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         fragmentId = id;
     }
 
-    public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
-    public PlanFragment getFragment() { return fragment_; }
+    public void setFragment(PlanFragment fragment) {
+        fragment_ = fragment;
+    }
+
+    public PlanFragment getFragment() {
+        return fragment_;
+    }
+
     public long getLimit() {
         return limit;
     }
@@ -284,11 +293,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         this.conjuncts.addAll(conjuncts);
     }
 
-    public void addPreFilterConjuncts(List<Expr> conjuncts) {
-        if (conjuncts == null) {
-            return;
-        }
-        this.preFilterConjuncts.addAll(conjuncts);
+    public void setAssignedConjuncts(Set<ExprId> conjuncts) {
+        assignedConjuncts = conjuncts;
+    }
+
+    public Set<ExprId> getAssignedConjuncts() {
+        return assignedConjuncts;
     }
 
     public void transferConjuncts(PlanNode recipient) {
@@ -296,29 +306,25 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         conjuncts.clear();
     }
 
-    /**
-     * Call computeMemLayout() for all materialized tuples.
-     */
-    protected void computeMemLayout(Analyzer analyzer) {
-        for (TupleId id: tupleIds) {
-            analyzer.getDescTbl().getTupleDesc(id).computeMemLayout();
+    public void addPreFilterConjuncts(List<Expr> conjuncts) {
+        if (conjuncts == null) {
+            return;
         }
+        this.preFilterConjuncts.addAll(conjuncts);
     }
 
-
     /**
-     * Computes and returns the sum of two cardinalities. If an overflow occurs,
-     * the maximum Long value is returned (Long.MAX_VALUE).
+     * Call computeStatAndMemLayout() for all materialized tuples.
      */
-    public static long addCardinalities(long a, long b) {
-        try {
-            return LongMath.checkedAdd(a, b);
-        } catch (ArithmeticException e) {
-            LOG.warn("overflow when adding cardinalities: " + a + ", " + b);
-            return Long.MAX_VALUE;
+    protected void computeTupleStatAndMemLayout(Analyzer analyzer) {
+        for (TupleId id : tupleIds) {
+            analyzer.getDescTbl().getTupleDesc(id).computeStatAndMemLayout();
         }
     }
 
+
+
+
     public String getExplainString() {
         return getExplainString("", "", TExplainLevel.VERBOSE);
     }
@@ -378,8 +384,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
             String childDetailPrefix = prefix + "|    ";
             for (int i = 1; i < children.size(); ++i) {
                 expBuilder.append(
-                  children.get(i).getExplainString(childHeadlinePrefix, childDetailPrefix,
-                    detailLevel));
+                        children.get(i).getExplainString(childHeadlinePrefix, childDetailPrefix,
+                                detailLevel));
                 expBuilder.append(childDetailPrefix + "\n");
             }
             expBuilder.append(children.get(0).getExplainString(prefix, prefix, detailLevel));
@@ -428,7 +434,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
             return;
         } else {
             msg.num_children = children.size();
-            for (PlanNode child: children) {
+            for (PlanNode child : children) {
                 child.treeToThriftHelper(container);
             }
         }
@@ -443,11 +449,20 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         for (PlanNode child : children) {
             child.finalize(analyzer);
         }
-        computeStats(analyzer);
+        computeNumNodes();
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            computeOldCardinality();
+        }
+    }
+
+    protected void computeNumNodes() {
+        if (!children.isEmpty()) {
+            numNodes = getChild(0).numNodes;
+        }
     }
 
     /**
-     * Computes planner statistics: avgRowSize, numNodes, cardinality.
+     * Computes planner statistics: avgRowSize.
      * Subclasses need to override this.
      * Assumes that it has already been called on all children.
      * This is broken out of finalize() so that it can be called separately
@@ -460,31 +475,39 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
             TupleDescriptor desc = analyzer.getTupleDesc(tid);
             avgRowSize += desc.getAvgSerializedSize();
         }
-        if (!children.isEmpty()) {
-            numNodes = getChild(0).numNodes;
-        }
     }
 
     /**
-     * Compute the product of the selectivity of all conjuncts.
+     * This function will calculate the cardinality when the old join reorder algorithm is enabled.
+     * This value is used to determine the distributed way(broadcast of shuffle) of join in the distributed planning.
+     *
+     * If the new join reorder and the old join reorder have the same cardinality calculation method,
+     *   also the calculation is completed in the init(),
+     *   there is no need to override this function.
      */
-    protected double computeSelectivity() {
-        double prod = 1.0;
-        for (Expr e : conjuncts) {
-            if (e.getSelectivity() < 0) {
-                return -1.0;
-            }
-            prod *= e.getSelectivity();
+    protected void computeOldCardinality() {
+    }
+
+    protected void capCardinalityAtLimit() {
+        if (hasLimit()) {
+            cardinality = cardinality == -1 ? limit : Math.min(cardinality, limit);
         }
-        return prod;
     }
 
     protected ExprSubstitutionMap outputSmap;
+
+    // global state of planning wrt conjunct assignment; used by planner as a shortcut
+    // to avoid having to pass assigned conjuncts back and forth
+    // (the planner uses this to save and reset the global state in between join tree
+    // alternatives)
+    protected Set<ExprId> assignedConjuncts;
+
     protected ExprSubstitutionMap withoutTupleIsNullOutputSmap;
 
     public ExprSubstitutionMap getOutputSmap() {
         return outputSmap;
     }
+
     public void setOutputSmap(ExprSubstitutionMap smap) {
         outputSmap = smap;
     }
@@ -492,25 +515,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     public void setWithoutTupleIsNullOutputSmap(ExprSubstitutionMap smap) {
         withoutTupleIsNullOutputSmap = smap;
     }
+
     public ExprSubstitutionMap getWithoutTupleIsNullOutputSmap() {
         return withoutTupleIsNullOutputSmap == null ? outputSmap : withoutTupleIsNullOutputSmap;
     }
-    /**
-     * Marks all slots referenced in exprs as materialized.
-     */
-    protected void markSlotsMaterialized(Analyzer analyzer, List<Expr> exprs) {
-        List<SlotId> refdIdList = Lists.newArrayList();
-
-        for (Expr expr: exprs) {
-            expr.getIds(null, refdIdList);
-        }
-
-        analyzer.materializeSlots(exprs);
-    }
 
     public void init(Analyzer analyzer) throws UserException {
         assignConjuncts(analyzer);
-        computeStats(analyzer);
         createDefaultSmap(analyzer);
     }
 
@@ -567,12 +578,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     /**
      * Sets outputSmap_ to compose(existing smap, combined child smap). Also
      * substitutes conjuncts_ using the combined child smap.
+     *
      * @throws AnalysisException
      */
     protected void createDefaultSmap(Analyzer analyzer) throws UserException {
         ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
         outputSmap =
-            ExprSubstitutionMap.compose(outputSmap, combinedChildSmap, analyzer);
+                ExprSubstitutionMap.compose(outputSmap, combinedChildSmap, analyzer);
 
         conjuncts = Expr.substituteList(conjuncts, outputSmap, analyzer, false);
     }
@@ -646,13 +658,85 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         }
     }
 
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]");
-        sb.append("\nFragment: ").append(getFragmentId().asInt()).append("]");
-        sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF));
-        return sb.toString();
+
+    /**
+     * Returns the estimated combined selectivity of all conjuncts. Uses heuristics to
+     * address the following estimation challenges:
+     * 1. The individual selectivities of conjuncts may be unknown.
+     * 2. Two selectivities, whether known or unknown, could be correlated. Assuming
+     * independence can lead to significant underestimation.
+     * <p>
+     * The first issue is addressed by using a single default selectivity that is
+     * representative of all conjuncts with unknown selectivities.
+     * The second issue is addressed by an exponential backoff when multiplying each
+     * additional selectivity into the final result.
+     */
+    static protected double computeCombinedSelectivity(List<Expr> conjuncts) {
+        // Collect all estimated selectivities.
+        List<Double> selectivities = new ArrayList<>();
+        for (Expr e : conjuncts) {
+            if (e.hasSelectivity()) selectivities.add(e.getSelectivity());
+        }
+        if (selectivities.size() != conjuncts.size()) {
+            // Some conjuncts have no estimated selectivity. Use a single default
+            // representative selectivity for all those conjuncts.
+            selectivities.add(Expr.DEFAULT_SELECTIVITY);
+        }
+        // Sort the selectivities to get a consistent estimate, regardless of the original
+        // conjunct order. Sort in ascending order such that the most selective conjunct
+        // is fully applied.
+        Collections.sort(selectivities);
+        double result = 1.0;
+        // selectivity = 1 * (s1)^(1/1) * (s2)^(1/2) * ... * (sn-1)^(1/(n-1)) * (sn)^(1/n)
+        for (int i = 0; i < selectivities.size(); ++i) {
+            // Exponential backoff for each selectivity multiplied into the final result.
+            result *= Math.pow(selectivities.get(i), 1.0 / (double) (i + 1));
+        }
+        // Bound result in [0, 1]
+        return Math.max(0.0, Math.min(1.0, result));
+    }
+
+    protected double computeSelectivity() {
+        for (Expr expr : conjuncts) {
+            expr.setSelectivity();
+        }
+        return computeCombinedSelectivity(conjuncts);
+    }
+
+    /**
+     * Compute the product of the selectivity of all conjuncts.
+     * This function is used for old cardinality in finalize()
+     */
+    protected double computeOldSelectivity() {
+        double prod = 1.0;
+        for (Expr e : conjuncts) {
+            if (e.getSelectivity() < 0) {
+                return -1.0;
+            }
+            prod *= e.getSelectivity();
+        }
+        return prod;
+    }
+
+    // Compute the cardinality after applying conjuncts based on 'preConjunctCardinality'.
+    protected void applyConjunctsSelectivity() {
+        if (cardinality == -1) {
+            return;
+        }
+        applySelectivity();
+    }
+
+    // Compute the cardinality after applying conjuncts with 'selectivity', based on
+    // 'preConjunctCardinality'.
+    private void applySelectivity() {
+        double selectivity = computeSelectivity();
+        Preconditions.checkState(cardinality >= 0);
+        long preConjunctCardinality = cardinality;
+        cardinality = Math.round(cardinality * selectivity);
+        // don't round cardinality down to zero for safety.
+        if (cardinality == 0 && preConjunctCardinality > 0) {
+            cardinality = 1;
+        }
     }
 
     public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) {
@@ -695,4 +779,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         }
         return Joiner.on(", ").join(filtersStr) + "\n";
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]");
+        sb.append("\nFragment: ").append(getFragmentId().asInt()).append("]");
+        sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF));
+        return sb.toString();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 16c0a37..6fd8d6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -183,10 +183,26 @@ public class Planner {
         if (selectFailed) {
             throw new MVSelectFailedException("Failed to select materialize view");
         }
-        // compute mem layout *before* finalize(); finalize() may reference
-        // TupleDescriptor.avgSerializedSize
+
+        /**
+         * - Under normal circumstances, computeMemLayout() will be executed
+         *     at the end of the init function of the plan node.
+         * Such as :
+         * OlapScanNode {
+         *     init () {
+         *         analyzer.materializeSlots(conjuncts);
+         *         computeTupleStatAndMemLayout(analyzer);
+         *         computeStat();
+         *     }
+         * }
+         * - However Doris is currently unable to determine
+         *     whether it is possible to cut or increase the columns in the tuple after PlanNode.init().
+         * - Therefore, for the time being, computeMemLayout() can only be placed
+         *     after the completion of the entire single node planner.
+         */
         analyzer.getDescTbl().computeMemLayout();
         singleNodePlan.finalize(analyzer);
+        
         if (queryOptions.num_nodes == 1) {
             // single-node execution; we're almost done
             singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
index b70af9e..cc8ab6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
@@ -33,11 +33,13 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TRepeatNode;
 
-import org.apache.commons.collections.CollectionUtils;
-
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
@@ -52,6 +54,8 @@ import java.util.stream.Collectors;
  */
 public class RepeatNode extends PlanNode {
 
+    private static final Logger LOG = LogManager.getLogger(RepeatNode.class);
+
     private List<Set<Integer>> repeatSlotIdList;
     private Set<Integer> allSlotId;
     private TupleDescriptor outputTupleDesc;
@@ -96,8 +100,11 @@ public class RepeatNode extends PlanNode {
     @Override
     public void computeStats(Analyzer analyzer) {
         avgRowSize = 0;
-        cardinality = 0;
         numNodes = 1;
+        cardinality = 0;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats Sort: cardinality=" + cardinality);
+        }
     }
 
     @Override
@@ -128,7 +135,7 @@ public class RepeatNode extends PlanNode {
                 ((SlotRef) slot).getDesc().setIsNullable(true);
             }
         }
-        outputTupleDesc.computeMemLayout();
+        outputTupleDesc.computeStatAndMemLayout();
 
         List<Set<SlotId>> groupingIdList = new ArrayList<>();
         List<Expr> exprList = groupByClause.getGroupingExprs();
@@ -163,7 +170,7 @@ public class RepeatNode extends PlanNode {
         for (TupleId id : tupleIds) {
             analyzer.getTupleDesc(id).setIsMaterialized(true);
         }
-        computeMemLayout(analyzer);
+        computeTupleStatAndMemLayout(analyzer);
         computeStats(analyzer);
         createDefaultSmap(analyzer);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index e039881..649d1c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
@@ -43,6 +44,13 @@ abstract public class ScanNode extends PlanNode {
         this.desc = desc;
     }
 
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        // materialize conjuncts in where
+        analyzer.materializeSlots(conjuncts);
+    }
+
     /**
      * Helper function to parse a "host:port" address string into TNetworkAddress
      * This is called with ipaddress:port when doing scan range assigment.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
index c2d30bc..d8eacd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
@@ -18,14 +18,15 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.common.UserException;
 import org.apache.doris.analysis.Expr;
-
+import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
-import org.apache.logging.log4j.Logger;
+
 import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 
 /**
@@ -54,16 +55,29 @@ public class SelectNode extends PlanNode {
 
     @Override
     public void init(Analyzer analyzer) throws UserException {
-      analyzer.markConjunctsAssigned(conjuncts);
-      computeStats(analyzer);
-      createDefaultSmap(analyzer);
+        super.init(analyzer);
+        analyzer.markConjunctsAssigned(conjuncts);
+        computeStats(analyzer);
     }
 
     @Override
     public void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        cardinality = getChild(0).cardinality;
+        applyConjunctsSelectivity();
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats Select: cardinality={}", this.cardinality);
+        }
+    }
+
+    @Override
+    protected void computeOldCardinality() {
         long cardinality = getChild(0).cardinality;
-        double selectivity = computeSelectivity();
+        double selectivity = computeOldSelectivity();
         if (cardinality < 0 || selectivity < 0) {
             this.cardinality = -1;
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
index 502d4ec..fbd3a48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
@@ -17,17 +17,13 @@
 
 package org.apache.doris.planner;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.CheckedMath;
 import org.apache.doris.thrift.TExceptNode;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
@@ -36,18 +32,19 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TUnionNode;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.commons.collections.CollectionUtils;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Node that merges the results of its child plans, Normally, this is done by
  * materializing the corresponding result exprs into a new tuple. However, if
@@ -131,12 +128,24 @@ public abstract class SetOperationNode extends PlanNode {
     @Override
     public void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        computeCardinality();
+    }
+
+    @Override
+    protected void computeOldCardinality() {
+        computeCardinality();
+    }
+
+    private void computeCardinality() {
         cardinality = constExprLists_.size();
         for (PlanNode child : children) {
             // ignore missing child cardinality info in the hope it won't matter enough
             // to change the planning outcome
             if (child.cardinality > 0) {
-                cardinality = addCardinalities(cardinality, child.cardinality);
+                cardinality = CheckedMath.checkedAdd(cardinality, child.cardinality);
             }
         }
         // The number of nodes of a set operation node is -1 (invalid) if all the referenced tables
@@ -145,31 +154,12 @@ public abstract class SetOperationNode extends PlanNode {
         if (numNodes == -1) {
             numNodes = 1;
         }
-        cardinality = capAtLimit(cardinality);
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("stats Union: cardinality=" + Long.toString(cardinality));
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.trace("stats Union: cardinality=" + cardinality);
         }
     }
 
-    protected long capAtLimit(long cardinality) {
-        if (hasLimit()) {
-            if (cardinality == -1) {
-                return limit;
-            } else {
-                return Math.min(cardinality, limit);
-            }
-        }
-        return cardinality;
-    }
-
-    /*
-    @Override
-    public void computeResourceProfile(TQueryOptions queryOptions) {
-        // TODO: add an estimate
-        resourceProfile_ = new ResourceProfile(0, 0);
-    }
-    */
-
     /**
      * Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can
      * be returned directly by the set operation node (without materialization into a new tuple).
@@ -270,7 +260,7 @@ public abstract class SetOperationNode extends PlanNode {
     @Override
     public void init(Analyzer analyzer) {
         Preconditions.checkState(conjuncts.isEmpty());
-        computeMemLayout(analyzer);
+        computeTupleStatAndMemLayout(analyzer);
         computeStats(analyzer);
         // except Node must not reorder the child
         if (!(this instanceof ExceptNode)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 9dcadd7..f11fbf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -56,6 +56,7 @@ import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
 
@@ -64,15 +65,20 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Constructs a non-executable single-node plan from an analyzed parse tree.
@@ -81,7 +87,7 @@ import java.util.Set;
  * The single-node plan needs to be wrapped in a plan fragment for it to be executable.
  */
 public class SingleNodePlanner {
-    private final static Logger LOG = LogManager.getLogger(DistributedPlanner.class);
+    private final static Logger LOG = LogManager.getLogger(SingleNodePlanner.class);
 
     private final PlannerContext ctx_;
     private final ArrayList<ScanNode> scanNodes = Lists.newArrayList();
@@ -91,6 +97,10 @@ public class SingleNodePlanner {
         ctx_ = ctx;
     }
 
+    public PlannerContext getPlannerContext() {
+        return ctx_;
+    }
+
     public ArrayList<ScanNode> getScanNodes() {
         return scanNodes;
     }
@@ -653,6 +663,270 @@ public class SingleNodePlanner {
     }
 
     /**
+     * Return the cheapest plan that materializes the joins of all TableRefs in
+     * refPlans and the subplans of all applicable TableRefs in subplanRefs.
+     * Assumes that refPlans are in the order as they originally appeared in
+     * the query.
+     * For this plan:
+     * - the plan is executable, ie, all non-cross joins have equi-join predicates
+     * - the leftmost scan is over the largest of the inputs for which we can still
+     * construct an executable plan
+     * - from bottom to top, all rhs's are in increasing order of selectivity (percentage
+     * of surviving rows)
+     * - outer/cross/semi joins: rhs serialized size is < lhs serialized size;
+     * enforced via join inversion, if necessary
+     * - SubplanNodes are placed as low as possible in the plan tree - as soon as the
+     * required tuple ids of one or more TableRefs in subplanRefs are materialized
+     * Returns null if we can't create an executable plan.
+     */
+    private PlanNode createCheapestJoinPlan(Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans) throws UserException {
+        if (refPlans.size() == 1) {
+            return refPlans.get(0).second;
+        }
+
+        // collect eligible candidates for the leftmost input; list contains
+        // (plan, materialized size)
+        List<Pair<TableRef, Long>> candidates = new ArrayList<>();
+        for (Pair<TableRef, PlanNode> entry : refPlans) {
+            TableRef ref = entry.first;
+            JoinOperator joinOp = ref.getJoinOp();
+
+            // Avoid reordering outer/semi joins which is generally incorrect.
+            // consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
+            // TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
+            if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
+                continue;
+            }
+
+            PlanNode plan = entry.second;
+            if (plan.getCardinality() == -1) {
+                // use 0 for the size to avoid it becoming the leftmost input
+                // TODO: Consider raw size of scanned partitions in the absence of stats.
+                candidates.add(new Pair<>(ref, new Long(0)));
+                LOG.debug("The candidate of " + ref.getUniqueAlias() + ": -1. "
+                        + "Using 0 instead of -1 to avoid error");
+                continue;
+            }
+            Preconditions.checkState(ref.isAnalyzed());
+            long materializedSize = plan.getCardinality();
+            candidates.add(new Pair<>(ref, new Long(materializedSize)));
+            LOG.debug("The candidate of " + ref.getUniqueAlias() + ": " + materializedSize);
+        }
+        // (ML): 这里感觉是不可能运行到的,因为起码第一个节点是inner join
+        if (candidates.isEmpty()) return null;
+
+        // order candidates by descending materialized size; we want to minimize the memory
+        // consumption of the materialized hash tables required for the join sequence
+        Collections.sort(candidates,
+                new Comparator<Pair<TableRef, Long>>() {
+                    @Override
+                    public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
+                        long diff = b.second - a.second;
+                        return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
+                    }
+                });
+
+        for (Pair<TableRef, Long> candidate : candidates) {
+            PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
+            if (result != null) return result;
+        }
+        return null;
+    }
+
+    boolean candidateCardinalityIsSmaller(PlanNode candidate, long candidateInnerNodeCardinality,
+                                          PlanNode newRoot, long newRootInnerNodeCardinality) {
+        if (candidate.getCardinality() < newRoot.getCardinality()) {
+            return true;
+        } else if (candidate.getCardinality() == newRoot.getCardinality()) {
+            if (((candidate instanceof HashJoinNode) && ((HashJoinNode) candidate).getJoinOp().isInnerJoin())
+                    && ((newRoot instanceof HashJoinNode) && ((HashJoinNode) newRoot).getJoinOp().isInnerJoin())) {
+                if (candidateInnerNodeCardinality < newRootInnerNodeCardinality) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Returns a plan with leftmostRef's plan as its leftmost input; the joins
+     * are in decreasing order of selectiveness (percentage of rows they eliminate).
+     * Creates and adds subplan nodes as soon as the tuple ids required by at least one
+     * subplan ref are materialized by a join node added during plan generation.
+     */
+    // (ML): change the function name
+    private PlanNode createJoinPlan(Analyzer analyzer,
+                                    TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
+            throws UserException {
+        LOG.debug("Try to create a query plan starting with " + leftmostRef.getUniqueAlias());
+
+        // the refs that have yet to be joined
+        List<Pair<TableRef, PlanNode>> remainingRefs = new ArrayList<>();
+        PlanNode root = null;  // root of accumulated join plan
+        for (Pair<TableRef, PlanNode> entry : refPlans) {
+            if (entry.first == leftmostRef) {
+                root = entry.second;
+            } else {
+                remainingRefs.add(entry);
+            }
+        }
+        Preconditions.checkNotNull(root);
+
+        // Maps from a TableRef in refPlans with an outer/semi join op to the set of
+        // TableRefs that precede it refPlans (i.e., in FROM-clause order).
+        // The map is used to place outer/semi joins at a fixed position in the plan tree
+        // (IMPALA-860), s.t. all the tables appearing to the left/right of an outer/semi
+        // join in the original query still remain to the left/right after join ordering.
+        // This prevents join re-ordering across outer/semi joins which is generally wrong.
+
+        /**
+         * Key of precedingRefs: the right table ref of outer or semi join
+         * Value of precedingRefs: the preceding refs of this key
+         * For example:
+         * select * from t1, t2, t3 left join t4, t5, t6 right semi join t7, t8, t9
+         * Map:
+         * { t4: [t1, t2, t3],
+         *   t7: [t1, t2, t3, t4, t5, t6]
+         * }
+         */
+        Map<TableRef, Set<TableRef>> precedingRefs = new HashMap<>();
+        List<TableRef> tmpTblRefs = new ArrayList<>();
+        for (Pair<TableRef, PlanNode> entry : refPlans) {
+            TableRef tblRef = entry.first;
+            if (tblRef.getJoinOp().isOuterJoin() || tblRef.getJoinOp().isSemiJoin()) {
+                precedingRefs.put(tblRef, Sets.newHashSet(tmpTblRefs));
+            }
+            tmpTblRefs.add(tblRef);
+        }
+
+        // Refs that have been joined. The union of joinedRefs and the refs in remainingRefs
+        // are the set of all table refs.
+        Set<TableRef> joinedRefs = Sets.newHashSet(leftmostRef);
+        // two statistical value
+        long numOps = 0;
+        // A total of several rounds of successful selection
+        int successfulSelectionTimes = 0;
+        while (!remainingRefs.isEmpty()) {
+            // We minimize the resulting cardinality at each step in the join chain,
+            // which minimizes the total number of hash table lookups.
+            PlanNode newRoot = null;
+            Pair<TableRef, PlanNode> minEntry = null;
+            long newRootRightChildCardinality = 0;
+            for (Pair<TableRef, PlanNode> tblRefToPlanNodeOfCandidate : remainingRefs) {
+                TableRef tblRefOfCandidate = tblRefToPlanNodeOfCandidate.first;
+                long cardinalityOfCandidate = tblRefToPlanNodeOfCandidate.second.getCardinality();
+                PlanNode rootPlanNodeOfCandidate = tblRefToPlanNodeOfCandidate.second;
+                JoinOperator joinOp = tblRefOfCandidate.getJoinOp();
+
+                // Place outer/semi joins at a fixed position in the plan tree.
+                Set<TableRef> requiredRefs = precedingRefs.get(tblRefOfCandidate);
+                if (requiredRefs != null) {
+                    Preconditions.checkState(joinOp.isOuterJoin()
+                            || joinOp.isSemiJoin());
+                    /**
+                     * The semi and outer join nodes are similar to the stop nodes in each round of the algorithm.
+                     * If the stop node is encountered during the current round of optimal selection,
+                     * it means that the following nodes do not need to be referred to.
+                     * This round has been completed.
+                     * There are two situation in here.
+                     * Situation 1: required table refs have not been placed yet
+                     * t1, t2, t3 left join t4, t5
+                     *     Round 1: t3, t1(new root) meets t4(stop)
+                     *              stop this round and begin next round
+                     * Situation 2: the remaining table refs to prevent incorrect re-ordering of tables across outer/semi joins
+                     *     Round 1: t5, t1, t2, t3(root) meets t4(stop)
+                     *              stop this round while the new root is null
+                     *              planning failed and return null
+                     */
+                    if (!requiredRefs.equals(joinedRefs)) {
+                        break;
+                    }
+                }
+                // reset assigned conjuncts of analyzer in every compare
+                analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
+                PlanNode candidate = createJoinNode(analyzer, root, rootPlanNodeOfCandidate, tblRefOfCandidate);
+                // (ML): 这里还需要吗?应该不会返回null吧
+                if (candidate == null) {
+                    continue;
+                }
+                // Have the build side of a join copy data to a compact representation
+                // in the tuple buffer.
+                candidate.getChildren().get(1).setCompactData(true);
+
+                if (LOG.isDebugEnabled()) {
+                    StringBuilder stringBuilder = new StringBuilder();
+                    stringBuilder.append("The " + tblRefOfCandidate.getUniqueAlias() + " is right child of join node.");
+                    stringBuilder.append("The join cardinality is " + candidate.getCardinality() + ".");
+                    stringBuilder.append("In round " + successfulSelectionTimes);
+                    LOG.debug(stringBuilder.toString());
+                }
+
+                // Use 'candidate' as the new root; don't consider any other table refs at this
+                // position in the plan.
+                if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
+                    newRoot = candidate;
+                    minEntry = tblRefToPlanNodeOfCandidate;
+                    break;
+                }
+
+                // Always prefer Hash Join over Nested-Loop Join due to limited costing
+                // infrastructure.
+                /**
+                 * The following three conditions are met while the candidate is better.
+                 * 1. The first candidate
+                 * 2. The candidate is better than new root: [t3, t2] pk [t3, t1] => [t3, t1]
+                 * 3. The hash join is better than cross join: [t3 cross t1] pk [t3 hash t2] => t3 hash t2
+                 */
+                if (newRoot == null
+                        || ((candidate.getClass().equals(newRoot.getClass()) && candidateCardinalityIsSmaller(candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(), newRoot, newRootRightChildCardinality)))
+                        || (candidate instanceof HashJoinNode && newRoot instanceof CrossJoinNode)) {
+                    newRoot = candidate;
+                    minEntry = tblRefToPlanNodeOfCandidate;
+                    newRootRightChildCardinality = cardinalityOfCandidate;
+                }
+            }
+
+            /**
+             * The table after the outer or semi join is wrongly planned to the front,
+             * causing the first tblRefToPlanNodeOfCandidate (outer or semi tbl ref) in this round of loop to fail and exit the loop.
+             * This means that the current leftmost node must be wrong, and the correct result cannot be planned.
+             *
+             * For example:
+             * Query: t1 left join t2 inner join t3
+             * Input params: t3(left most tbl ref), [t1,t2] (remaining refs)
+             *     Round 1: t3, t1 (joined refs) t2 (remaining refs)
+             *     Round 2: requiredRefs.equals(joinedRefs) is false and break, the newRoot is null
+             * Result: null
+             * The t3 should not appear before t2 so planning is fail
+             */
+            if (newRoot == null) {
+                // Could not generate a valid plan.
+                // for example: the biggest table is the last table
+                return null;
+            }
+
+            // we need to insert every rhs row into the hash table and then look up
+            // every lhs row
+            long lhsCardinality = root.getCardinality();
+            long rhsCardinality = minEntry.second.getCardinality();
+            numOps += lhsCardinality + rhsCardinality;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Round " + successfulSelectionTimes + " chose " + minEntry.first.getUniqueAlias()
+                        + " #lhs=" + lhsCardinality + " #rhs=" + rhsCardinality + " #ops=" + numOps);
+            }
+            remainingRefs.remove(minEntry);
+            joinedRefs.add(minEntry.first);
+            root = newRoot;
+            analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
+            ++successfulSelectionTimes;
+        }
+
+        LOG.debug("The final join sequence is "
+                + joinedRefs.stream().map(TableRef::getUniqueAlias).collect(Collectors.joining(",")));
+        return root;
+    }
+
+    /**
      * Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
      * of the selectStmt query block.
      */
@@ -691,35 +965,66 @@ public class SingleNodePlanner {
             return createAggregationPlan(selectStmt, analyzer, emptySetNode);
         }
 
-        // create left-deep sequence of binary hash joins; assign node ids as we go along
-        TableRef tblRef = selectStmt.getTableRefs().get(0);
-        materializeTableResultForCrossJoinOrCountStar(tblRef, analyzer);
-        PlanNode root = createTableRefNode(analyzer, tblRef, selectStmt);
-        // to change the inner contains analytic function
-        // selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap());
-
-        // add aggregate node here
+        PlanNode root = null;
         AggregateInfo aggInfo = selectStmt.getAggInfo();
 
-        turnOffPreAgg(aggInfo, selectStmt, analyzer, root);
+        if (analyzer.safeIsEnableJoinReorderBasedCost()) {
+            LOG.debug("Using new join reorder strategy when enable_join_reorder_based_cost is true");
+            // create plans for our table refs; use a list here instead of a map to
+            // maintain a deterministic order of traversing the TableRefs during join
+            // plan generation (helps with tests)
+            List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+            for (TableRef ref : selectStmt.getTableRefs()) {
+                materializeTableResultForCrossJoinOrCountStar(ref, analyzer);
+                PlanNode plan = createTableRefNode(analyzer, ref, selectStmt);
+                turnOffPreAgg(aggInfo, selectStmt, analyzer, plan);
+
+                if (plan instanceof OlapScanNode) {
+                    OlapScanNode olapNode = (OlapScanNode) plan;
+                    // this olap scan node has been turn off pre-aggregation, should not be turned on again.
+                    // e.g. select sum(v1) from (select v1 from test_table);
+                    if (!olapNode.isPreAggregation()) {
+                        olapNode.setCanTurnOnPreAggr(false);
+                    }
+                }
 
-        if (root instanceof OlapScanNode) {
-            OlapScanNode olapNode = (OlapScanNode) root;
-            // this olap scan node has been turn off pre-aggregation, should not be turned on again.
-            // e.g. select sum(v1) from (select v1 from test_table);
-            if (!olapNode.isPreAggregation()) {
-                olapNode.setCanTurnOnPreAggr(false);
+                Preconditions.checkState(plan != null);
+                refPlans.add(new Pair(ref, plan));
+            }
+            // save state of conjunct assignment; needed for join plan generation
+            for (Pair<TableRef, PlanNode> entry : refPlans) {
+                entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
+            }
+            root = createCheapestJoinPlan(analyzer, refPlans);
+            Preconditions.checkState(root != null);
+        } else {
+            // create left-deep sequence of binary hash joins; assign node ids as we go along
+            TableRef tblRef = selectStmt.getTableRefs().get(0);
+            materializeTableResultForCrossJoinOrCountStar(tblRef, analyzer);
+            root = createTableRefNode(analyzer, tblRef, selectStmt);
+            // to change the inner contains analytic function
+            // selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap());
+
+            turnOffPreAgg(aggInfo, selectStmt, analyzer, root);
+
+            if (root instanceof OlapScanNode) {
+                OlapScanNode olapNode = (OlapScanNode) root;
+                // this olap scan node has been turn off pre-aggregation, should not be turned on again.
+                // e .g. select sum(v1) from (select v1 from test_table);
+                if (!olapNode.isPreAggregation()) {
+                    olapNode.setCanTurnOnPreAggr(false);
+                }
             }
-        }
 
-        for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
-            TableRef outerRef = selectStmt.getTableRefs().get(i - 1);
-            TableRef innerRef = selectStmt.getTableRefs().get(i);
-            root = createJoinNode(analyzer, root, outerRef, innerRef, selectStmt);
-            // Have the build side of a join copy data to a compact representation
-            // in the tuple buffer.
-            root.getChildren().get(1).setCompactData(true);
-            root.assignConjuncts(analyzer);
+            for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
+                TableRef outerRef = selectStmt.getTableRefs().get(i - 1);
+                TableRef innerRef = selectStmt.getTableRefs().get(i);
+                root = createJoinNode(analyzer, root, innerRef, selectStmt);
+                // Have the build side of a join copy data to a compact representation
+                // in the tuple buffer.
+                root.getChildren().get(1).setCompactData(true);
+                root.assignConjuncts(analyzer);
+            }
         }
 
         if (selectStmt.getSortInfo() != null && selectStmt.getLimit() == -1
@@ -902,7 +1207,7 @@ public class SingleNodePlanner {
             // slotDesc.setStats(ColumnStats.fromExpr(resultExpr));
             slotDesc.setIsMaterialized(true);
         }
-        tupleDesc.computeMemLayout();
+        tupleDesc.computeStatAndMemLayout();
         return tupleDesc;
     }
 
@@ -1451,15 +1756,14 @@ public class SingleNodePlanner {
             scanNode.setSortColumn(tblRef.getSortColumn());
             scanNode.addConjuncts(pushDownConjuncts);
         }
-        // assignConjuncts(scanNode, analyzer);
-        scanNode.init(analyzer);
-        // TODO chenhao add
-        // materialize conjuncts in where
-        analyzer.materializeSlots(scanNode.getConjuncts());
 
         scanNodes.add(scanNode);
+        // now we put the selectStmtToScanNodes's init before the scanNode.init
         List<ScanNode> scanNodeList = selectStmtToScanNodes.computeIfAbsent(selectStmt.getAnalyzer(), k -> Lists.newArrayList());
         scanNodeList.add(scanNode);
+
+        scanNode.init(analyzer);
+
         return scanNode;
     }
 
@@ -1549,18 +1853,9 @@ public class SingleNodePlanner {
         }
     }
 
-    /**
-     * Creates a new node to join outer with inner. Collects and assigns join conjunct
-     * as well as regular conjuncts. Calls init() on the new join node.
-     * Throws if the JoinNode.init() fails.
-     */
-    private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef outerRef, TableRef innerRef,
-                                    SelectStmt selectStmt)
-            throws UserException, AnalysisException {
+    private PlanNode createJoinNodeBase(Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef innerRef)
+            throws UserException {
         materializeTableResultForCrossJoinOrCountStar(innerRef, analyzer);
-        // the rows coming from the build node only need to have space for the tuple
-        // materialized by that node
-        PlanNode inner = createTableRefNode(analyzer, innerRef, selectStmt);
 
         List<Expr> eqJoinConjuncts = Lists.newArrayList();
         Reference<String> errMsg = new Reference<String>();
@@ -1577,8 +1872,8 @@ public class SingleNodePlanner {
             }
 
             // construct cross join node
-            LOG.debug("Join between {} and {} requires at least one conjunctive equality predicate between the two tables",
-                    outerRef.getAliasAsName(), innerRef.getAliasAsName());
+            // LOG.debug("Join between {} and {} requires at least one conjunctive equality predicate between the two tables",
+            //        outerRef.getAliasAsName(), innerRef.getAliasAsName());
             // TODO If there are eq join predicates then we should construct a hash join
             CrossJoinNode result =
                     new CrossJoinNode(ctx_.getNextNodeId(), outer, inner, innerRef);
@@ -1606,6 +1901,28 @@ public class SingleNodePlanner {
         return result;
     }
 
+    /*
+    for joinreorder
+    */
+    public PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef innerRef)
+            throws UserException {
+        return createJoinNodeBase(analyzer, outer, inner, innerRef);
+    }
+
+    /**
+     * Creates a new node to join outer with inner. Collects and assigns join conjunct
+     * as well as regular conjuncts. Calls init() on the new join node.
+     * Throws if the JoinNode.init() fails.
+     */
+    private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef innerRef,
+                                    SelectStmt selectStmt) throws UserException {
+        // the rows coming from the build node only need to have space for the tuple
+        // materialized by that node
+        PlanNode inner = createTableRefNode(analyzer, innerRef, selectStmt);
+
+        return createJoinNodeBase(analyzer, outer, inner, innerRef);
+    }
+
     /**
      * Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
      * CollectionTableRef or an InlineViewRef.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 93c139e..c0450b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -124,6 +124,19 @@ public class SortNode extends PlanNode {
     @Override
     protected void computeStats(Analyzer analyzer) {
         super.computeStats(analyzer);
+        if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
+            return;
+        }
+        cardinality = getChild(0).cardinality;
+        applyConjunctsSelectivity();
+        capCardinalityAtLimit();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("stats Sort: cardinality=" + cardinality);
+        }
+    }
+
+    @Override
+    protected void computeOldCardinality() {
         cardinality = getChild(0).cardinality;
         if (hasLimit()) {
             if (cardinality == -1) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 39c0f5d..dcd8afb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -137,7 +137,7 @@ public class StreamLoadPlanner {
         // create scan node
         scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), tupleDesc, destTable, taskInfo);
         scanNode.init(analyzer);
-        descTable.computeMemLayout();
+        descTable.computeStatAndMemLayout();
         scanNode.finalize(analyzer);
 
         // create dest sink
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 76058ff..200e232 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -143,7 +143,6 @@ public class StreamLoadScanNode extends LoadScanNode {
         initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer);
         initAndSetWhereExpr(taskInfo.getWhereExpr(), this.desc, analyzer);
 
-        computeStats(analyzer);
         createDefaultSmap(analyzer);
 
         if (taskInfo.getColumnSeparator() != null) {
@@ -169,6 +168,7 @@ public class StreamLoadScanNode extends LoadScanNode {
         brokerScanRange.setParams(params);
 
         brokerScanRange.setBrokerAddresses(Lists.newArrayList());
+        computeStats(analyzer);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0dcae49..19bedaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -84,6 +84,8 @@ public class SessionVariable implements Serializable, Writable {
     public static final String ENABLE_SQL_CACHE = "enable_sql_cache";
     public static final String ENABLE_PARTITION_CACHE = "enable_partition_cache";
 
+    public static final String ENABLE_COST_BASED_JOIN_REORDER = "enable_cost_based_join_reorder";
+
     public static final int MIN_EXEC_INSTANCE_NUM = 1;
     public static final int MAX_EXEC_INSTANCE_NUM = 32;
     // if set to true, some of stmt will be forwarded to master FE to get result
@@ -288,6 +290,9 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = ENABLE_PARTITION_CACHE)
     public boolean enablePartitionCache = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
+    private boolean enableJoinReorderBasedCost = false;
+
     @VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
     public boolean forwardToMaster = false;
 
@@ -374,6 +379,8 @@ public class SessionVariable implements Serializable, Writable {
         this.sqlMode = sqlMode;
     }
 
+    public boolean isEnableJoinReorderBasedCost() { return enableJoinReorderBasedCost; }
+
     public boolean isAutoCommit() {
         return autoCommit;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 14e5c31..7ec6222 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -356,7 +356,7 @@ public class StmtExecutor implements ProfileWriter {
             throw e;
         } catch (UserException e) {
             // analysis exception only print message, not print the stack
-            LOG.warn("execute Exception. {}", e.getMessage());
+            LOG.warn("execute Exception. {}", e);
             context.getState().setError(e.getMessage());
             context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
         } catch (Exception e) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 5aac2db..d6535a3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1456,6 +1456,8 @@ public class QueryPlanTest {
         Assert.assertTrue(explainString.contains("AGGREGATE (update serialize)"));
     }
 
+
+    @Test
     public void testLeadAndLagFunction() throws Exception {
         connectContext.setDatabase("default_cluster:test");
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/SingleNodePlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/SingleNodePlannerTest.java
index 528d91f..8b77b48 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/SingleNodePlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/SingleNodePlannerTest.java
@@ -20,22 +20,36 @@
 
 package org.apache.doris.planner;
 
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Tested;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BaseTableRef;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
+import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 
 import com.google.common.collect.Lists;
 
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import mockit.Expectations;
@@ -69,6 +83,1348 @@ public class SingleNodePlannerTest {
         };
         SingleNodePlanner singleNodePlanner = new SingleNodePlanner(plannerContext);
         Deencapsulation.invoke(singleNodePlanner, "materializeSlotForEmptyMaterializedTableRef",
-                               baseTableRef, analyzer);
+                baseTableRef, analyzer);
+    }
+
+    /*
+    Assumptions:
+    1. The order of materialized size from smallest to largest is t1, t2 ... tn
+    2. The predicates are orthogonal to each other and don't affect each other.
+     */
+
+    /*
+    Query: select * from t1 inner join t2 on t1.k1=t2.k1
+    Original Query: select * from test1 inner join test2 on test1.k1=test2.k2
+    Expect: without changed
+     */
+    @Test
+    public void testJoinReorderWithTwoTuple1(@Injectable PlannerContext context,
+                                             @Injectable Analyzer analyzer,
+                                             @Injectable BaseTableRef tableRef1,
+                                             @Injectable OlapScanNode scanNode1,
+                                             @Injectable BaseTableRef tableRef2,
+                                             @Injectable OlapScanNode scanNode2,
+                                             @Injectable TupleDescriptor tupleDescriptor1,
+                                             @Injectable SlotDescriptor slotDescriptor1,
+                                             @Injectable SlotDescriptor slotDescriptor2,
+                                             @Injectable BinaryPredicate eqBinaryPredicate,
+                                             @Injectable SlotRef eqSlot1,
+                                             @Injectable SlotRef eqSlot2,
+                                             @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        List<SlotDescriptor> slotDescriptors1 = Lists.newArrayList();
+        slotDescriptors1.add(slotDescriptor1);
+        List<SlotDescriptor> slotDescriptors2 = Lists.newArrayList();
+        slotDescriptors2.add(slotDescriptor2);
+        tableRef1.setJoinOp(JoinOperator.INNER_JOIN);
+        tableRef2.setJoinOp(JoinOperator.INNER_JOIN);
+        List<Expr> eqConjuncts = Lists.newArrayList();
+        eqConjuncts.add(eqBinaryPredicate);
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                tableRef2.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                tableRef1.getDesc();
+                result = tupleDescriptor1;
+                tupleDescriptor1.getMaterializedSlots();
+                result = slotDescriptors1;
+                analyzer.getEqJoinConjuncts(new ArrayList<>(), new ArrayList<>());
+                result = eqConjuncts;
+                scanNode1.getTblRefIds();
+                result = Lists.newArrayList();
+                scanNode2.getTblRefIds();
+                result = Lists.newArrayList();
+                eqBinaryPredicate.getChild(0);
+                result = eqSlot1;
+                eqSlot1.isBoundByTupleIds(new ArrayList<>());
+                result = true;
+                eqBinaryPredicate.getChild(1);
+                result = eqSlot2;
+                eqSlot2.isBoundByTupleIds(new ArrayList<>());
+                result = true;
+                scanNode1.getTupleIds();
+                result = Lists.newArrayList();
+                scanNode2.getTupleIds();
+                result = Lists.newArrayList();
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair1);
+        refPlans.add(pair2);
+
+        PlanNode cheapestJoinNode =
+                Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(scanNode2, cheapestJoinNode.getChild(0));
+        Assert.assertEquals(scanNode1, cheapestJoinNode.getChild(1));
+    }
+
+    /*
+    Query: select * from t1 left join t2 on t1.k1=t2.k1
+    Original Query: select * from test1 left join test2 on test1.k1=test2.k2
+    Expect: without changed
+     */
+    @Test
+    public void testJoinReorderWithTwoTuple2(@Injectable PlannerContext context,
+                                             @Injectable Analyzer analyzer,
+                                             @Injectable BaseTableRef tableRef1,
+                                             @Injectable OlapScanNode scanNode1,
+                                             @Injectable BaseTableRef tableRef2,
+                                             @Injectable OlapScanNode scanNode2,
+                                             @Injectable TupleDescriptor tupleDescriptor2,
+                                             @Injectable SlotDescriptor slotDescriptor1,
+                                             @Injectable SlotDescriptor slotDescriptor2,
+                                             @Injectable BinaryPredicate eqBinaryPredicate,
+                                             @Injectable SlotRef eqSlot1,
+                                             @Injectable SlotRef eqSlot2,
+                                             @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair1);
+        refPlans.add(pair2);
+        tableRef1.setJoinOp(JoinOperator.INNER_JOIN);
+        tableRef2.setJoinOp(JoinOperator.LEFT_OUTER_JOIN);
+
+        List<SlotDescriptor> slotDescriptors1 = Lists.newArrayList();
+        slotDescriptors1.add(slotDescriptor1);
+        List<SlotDescriptor> slotDescriptors2 = Lists.newArrayList();
+        slotDescriptors2.add(slotDescriptor2);
+        List<Expr> eqConjuncts = Lists.newArrayList();
+        eqConjuncts.add(eqBinaryPredicate);
+
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                tableRef2.getDesc();
+                result = tupleDescriptor2;
+                tupleDescriptor2.getMaterializedSlots();
+                result = slotDescriptors2;
+                analyzer.getEqJoinConjuncts(new ArrayList<>(), new ArrayList<>());
+                result = eqConjuncts;
+                scanNode1.getTblRefIds();
+                result = Lists.newArrayList();
+                scanNode2.getTblRefIds();
+                result = Lists.newArrayList();
+                eqBinaryPredicate.getChild(0);
+                result = eqSlot1;
+                eqSlot1.isBoundByTupleIds(new ArrayList<>());
+                result = true;
+                eqBinaryPredicate.getChild(1);
+                result = eqSlot2;
+                eqSlot2.isBoundByTupleIds(new ArrayList<>());
+                result = true;
+                scanNode1.getTupleIds();
+                result = Lists.newArrayList();
+                scanNode2.getTupleIds();
+                result = Lists.newArrayList();
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+                tableRef1.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef2.getJoinOp();
+                result = JoinOperator.LEFT_OUTER_JOIN;
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        PlanNode cheapestJoinNode = Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(true, cheapestJoinNode instanceof HashJoinNode);
+        Assert.assertEquals(JoinOperator.LEFT_OUTER_JOIN, ((HashJoinNode) cheapestJoinNode).getJoinOp());
+        Assert.assertEquals(scanNode1, cheapestJoinNode.getChild(0));
+        Assert.assertEquals(scanNode2, cheapestJoinNode.getChild(1));
+    }
+
+    /*
+    Query: select * from t1 right join t2 on t1.k1=t2.k1
+    Original Query: select * from test1 right join test2 on test1.k1=test2.k2
+    Expect: without changed
+     */
+    @Test
+    public void testJoinReorderWithTwoTuple3(@Injectable PlannerContext context,
+                                             @Injectable Analyzer analyzer,
+                                             @Injectable BaseTableRef tableRef1,
+                                             @Injectable OlapScanNode scanNode1,
+                                             @Injectable BaseTableRef tableRef2,
+                                             @Injectable OlapScanNode scanNode2,
+                                             @Injectable TupleDescriptor tupleDescriptor2,
+                                             @Injectable SlotDescriptor slotDescriptor1,
+                                             @Injectable SlotDescriptor slotDescriptor2,
+                                             @Injectable BinaryPredicate eqBinaryPredicate,
+                                             @Injectable SlotRef eqSlot1,
+                                             @Injectable SlotRef eqSlot2,
+                                             @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair1);
+        refPlans.add(pair2);
+
+        List<SlotDescriptor> slotDescriptors1 = Lists.newArrayList();
+        slotDescriptors1.add(slotDescriptor1);
+        List<SlotDescriptor> slotDescriptors2 = Lists.newArrayList();
+        slotDescriptors2.add(slotDescriptor2);
+        List<Expr> eqConjuncts = Lists.newArrayList();
+        eqConjuncts.add(eqBinaryPredicate);
+
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                tableRef2.getDesc();
+                result = tupleDescriptor2;
+                tupleDescriptor2.getMaterializedSlots();
+                result = slotDescriptors2;
+                analyzer.getEqJoinConjuncts(new ArrayList<>(), new ArrayList<>());
+                result = eqConjuncts;
+                scanNode1.getTblRefIds();
+                result = Lists.newArrayList();
+                scanNode2.getTblRefIds();
+                result = Lists.newArrayList();
+                eqBinaryPredicate.getChild(0);
+                result = eqSlot1;
+                eqSlot1.isBoundByTupleIds(new ArrayList<>());
+                result = true;
+                eqBinaryPredicate.getChild(1);
+                result = eqSlot2;
+                eqSlot2.isBoundByTupleIds(new ArrayList<>());
+                result = true;
+                scanNode1.getTupleIds();
+                result = Lists.newArrayList();
+                scanNode2.getTupleIds();
+                result = Lists.newArrayList();
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+                tableRef1.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef2.getJoinOp();
+                result = JoinOperator.RIGHT_OUTER_JOIN;
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        PlanNode cheapestJoinNode = Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(true, cheapestJoinNode instanceof HashJoinNode);
+        Assert.assertEquals(JoinOperator.RIGHT_OUTER_JOIN, ((HashJoinNode) cheapestJoinNode).getJoinOp());
+        Assert.assertEquals(scanNode1, cheapestJoinNode.getChild(0));
+        Assert.assertEquals(scanNode2, cheapestJoinNode.getChild(1));
+    }
+
+    /*
+    Query: select * from t1 left join t2 on t1.k1=t2.k1 inner join t3 on xxx
+    Original Query: select * from test1 left join test2 on test1.k1=test2.k1 inner join test3 where test2.k1=test3.k1;
+    Expect: without changed
+     */
+    @Test
+    public void testKeepRightTableRefOnLeftJoin(@Injectable PlannerContext context,
+                                                @Injectable Analyzer analyzer,
+                                                @Injectable BaseTableRef tableRef1,
+                                                @Injectable OlapScanNode scanNode1,
+                                                @Injectable BaseTableRef tableRef2,
+                                                @Injectable OlapScanNode scanNode2,
+                                                @Injectable BaseTableRef tableRef3,
+                                                @Injectable OlapScanNode scanNode3,
+                                                @Injectable TupleDescriptor tupleDescriptor1,
+                                                @Injectable TupleDescriptor tupleDescriptor2,
+                                                @Injectable TupleDescriptor tupleDescriptor3,
+                                                @Injectable SlotDescriptor slotDescriptor1,
+                                                @Injectable SlotDescriptor slotDescriptor2,
+                                                @Injectable SlotDescriptor slotDescriptor3,
+                                                @Injectable BinaryPredicate eqBinaryPredicate1,
+                                                @Injectable BinaryPredicate eqBinaryPredicate2,
+                                                @Injectable BinaryPredicate eqBinaryPredicate3,
+                                                @Injectable SlotRef eqT1Slot1,
+                                                @Injectable SlotRef eqT2Slot2,
+                                                @Injectable SlotRef eqT3Slot3,
+                                                @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        Pair<TableRef, PlanNode> pair3 = new Pair<>(tableRef3, scanNode3);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair1);
+        refPlans.add(pair2);
+        refPlans.add(pair3);
+
+        TupleId tupleId1 = new TupleId(1);
+        TupleId tupleId2 = new TupleId(2);
+        TupleId tupleId3 = new TupleId(3);
+        List<TupleId> tupleIds1 = Lists.newArrayList(tupleId1);
+        List<TupleId> tupleIds2 = Lists.newArrayList(tupleId2);
+        List<TupleId> tupleIds3 = Lists.newArrayList(tupleId3);
+
+        List<SlotDescriptor> slotDescriptors1 = Lists.newArrayList();
+        slotDescriptors1.add(slotDescriptor1);
+        List<SlotDescriptor> slotDescriptors2 = Lists.newArrayList();
+        slotDescriptors2.add(slotDescriptor2);
+        List<Expr> eqConjuncts1 = Lists.newArrayList();
+        eqConjuncts1.add(eqBinaryPredicate1);
+        List<Expr> eqConjuncts2 = Lists.newArrayList();
+        eqConjuncts2.add(eqBinaryPredicate2);
+        List<Expr> eqConjuncts3 = Lists.newArrayList();
+        eqConjuncts3.add(eqBinaryPredicate3);
+
+
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                tableRef3.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                scanNode3.getCardinality();
+                result = 3;
+                tableRef1.getDesc();
+                result = tupleDescriptor1;
+                tupleDescriptor1.getMaterializedSlots();
+                result = slotDescriptors1;
+                tableRef2.getDesc();
+                result = tupleDescriptor2;
+                tupleDescriptor2.getMaterializedSlots();
+                result = slotDescriptors2;
+                tableRef3.getDesc();
+                result = tupleDescriptor3;
+                tupleDescriptor3.getMaterializedSlots();
+                result = slotDescriptor3;
+                analyzer.getEqJoinConjuncts(tupleIds1, tupleIds2);
+                result = eqConjuncts1;
+                analyzer.getEqJoinConjuncts(Lists.newArrayList(tupleId1, tupleId2), tupleIds3);
+                result = eqConjuncts2;
+                analyzer.getEqJoinConjuncts(tupleIds3, tupleIds1);
+                result = eqConjuncts3;
+                scanNode1.getTblRefIds();
+                result = Lists.newArrayList(tupleIds1);
+                scanNode2.getTblRefIds();
+                result = Lists.newArrayList(tupleIds2);
+                scanNode3.getTblRefIds();
+                result = Lists.newArrayList(tupleIds3);
+                eqBinaryPredicate1.getChild(0);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds1);
+                result = true;
+                eqBinaryPredicate1.getChild(1);
+                result = eqT2Slot2;
+                eqT2Slot2.isBoundByTupleIds(tupleIds2);
+                result = true;
+                eqT2Slot2.isBoundByTupleIds(Lists.newArrayList(tupleId1, tupleId2));
+                result = true;
+                eqBinaryPredicate2.getChild(0);
+                result = eqT2Slot2;
+                eqBinaryPredicate2.getChild(1);
+                result = eqT3Slot3;
+                eqT3Slot3.isBoundByTupleIds(tupleIds3);
+                result = true;
+                eqBinaryPredicate3.getChild(0);
+                result = eqT1Slot1;
+                eqBinaryPredicate3.getChild(1);
+                result = eqT3Slot3;
+                scanNode1.getTupleIds();
+                result = tupleIds1;
+                scanNode2.getTupleIds();
+                result = tupleIds2;
+                scanNode3.getTupleIds();
+                result = tupleId3;
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+                tableRef3.getUniqueAlias();
+                result = "t3";
+                tableRef1.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef2.getJoinOp();
+                result = JoinOperator.LEFT_OUTER_JOIN;
+                tableRef3.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        PlanNode cheapestJoinNode = Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(true, cheapestJoinNode instanceof HashJoinNode);
+        Assert.assertTrue(((HashJoinNode) cheapestJoinNode).getJoinOp().isInnerJoin());
+        Assert.assertEquals(true, cheapestJoinNode.getChild(0) instanceof HashJoinNode);
+        HashJoinNode child0 = (HashJoinNode) cheapestJoinNode.getChild(0);
+        Assert.assertTrue(child0.getJoinOp().isOuterJoin());
+        Assert.assertEquals(2, child0.getChildren().size());
+        Assert.assertEquals(scanNode1, child0.getChild(0));
+        Assert.assertEquals(scanNode2, child0.getChild(1));
+        Assert.assertEquals(scanNode3, cheapestJoinNode.getChild(1));
+
+    }
+
+    /*
+    Query: select * from t1 right join t2 on t1.k1=t2.k1 inner join t3 on xxx
+    Original Query: select * from test1 right join test2 on test1.k1=test2.k1 inner join test3 where test2.k1=test3.k1
+    Expect: without changed
+     */
+    @Test
+    public void testKeepRightTableRefOnRightJoin(@Injectable PlannerContext context,
+                                                 @Injectable Analyzer analyzer,
+                                                 @Injectable BaseTableRef tableRef1,
+                                                 @Injectable OlapScanNode scanNode1,
+                                                 @Injectable BaseTableRef tableRef2,
+                                                 @Injectable OlapScanNode scanNode2,
+                                                 @Injectable BaseTableRef tableRef3,
+                                                 @Injectable OlapScanNode scanNode3,
+                                                 @Injectable TupleDescriptor tupleDescriptor1,
+                                                 @Injectable TupleDescriptor tupleDescriptor2,
+                                                 @Injectable TupleDescriptor tupleDescriptor3,
+                                                 @Injectable SlotDescriptor slotDescriptor1,
+                                                 @Injectable SlotDescriptor slotDescriptor2,
+                                                 @Injectable SlotDescriptor slotDescriptor3,
+                                                 @Injectable BinaryPredicate eqBinaryPredicate1,
+                                                 @Injectable BinaryPredicate eqBinaryPredicate2,
+                                                 @Injectable BinaryPredicate eqBinaryPredicate3,
+                                                 @Injectable SlotRef eqT1Slot1,
+                                                 @Injectable SlotRef eqT2Slot2,
+                                                 @Injectable SlotRef eqT3Slot3,
+                                                 @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        Pair<TableRef, PlanNode> pair3 = new Pair<>(tableRef3, scanNode3);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair1);
+        refPlans.add(pair2);
+        refPlans.add(pair3);
+
+        TupleId tupleId1 = new TupleId(1);
+        TupleId tupleId2 = new TupleId(2);
+        TupleId tupleId3 = new TupleId(3);
+        List<TupleId> tupleIds1 = Lists.newArrayList(tupleId1);
+        List<TupleId> tupleIds2 = Lists.newArrayList(tupleId2);
+        List<TupleId> tupleIds3 = Lists.newArrayList(tupleId3);
+
+        List<SlotDescriptor> slotDescriptors1 = Lists.newArrayList();
+        slotDescriptors1.add(slotDescriptor1);
+        List<SlotDescriptor> slotDescriptors2 = Lists.newArrayList();
+        slotDescriptors2.add(slotDescriptor2);
+        List<Expr> eqConjuncts1 = Lists.newArrayList();
+        eqConjuncts1.add(eqBinaryPredicate1);
+        List<Expr> eqConjuncts2 = Lists.newArrayList();
+        eqConjuncts2.add(eqBinaryPredicate2);
+        List<Expr> eqConjuncts3 = Lists.newArrayList();
+        eqConjuncts3.add(eqBinaryPredicate3);
+
+
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                tableRef3.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                scanNode3.getCardinality();
+                result = 3;
+                tableRef1.getDesc();
+                result = tupleDescriptor1;
+                tupleDescriptor1.getMaterializedSlots();
+                result = slotDescriptors1;
+                tableRef2.getDesc();
+                result = tupleDescriptor2;
+                tupleDescriptor2.getMaterializedSlots();
+                result = slotDescriptors2;
+                tableRef3.getDesc();
+                result = tupleDescriptor3;
+                tupleDescriptor3.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor3);
+                analyzer.getEqJoinConjuncts(tupleIds1, tupleIds2);
+                result = eqConjuncts1;
+                analyzer.getEqJoinConjuncts(Lists.newArrayList(tupleId1, tupleId2), tupleIds3);
+                result = eqConjuncts2;
+                analyzer.getEqJoinConjuncts(tupleIds3, tupleIds1);
+                result = eqConjuncts3;
+                scanNode1.getTblRefIds();
+                result = Lists.newArrayList(tupleIds1);
+                scanNode2.getTblRefIds();
+                result = Lists.newArrayList(tupleIds2);
+                scanNode3.getTblRefIds();
+                result = Lists.newArrayList(tupleIds3);
+                eqBinaryPredicate1.getChild(0);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds1);
+                result = true;
+                eqBinaryPredicate1.getChild(1);
+                result = eqT2Slot2;
+                eqT2Slot2.isBoundByTupleIds(tupleIds2);
+                result = true;
+                eqT2Slot2.isBoundByTupleIds(Lists.newArrayList(tupleId1, tupleId2));
+                result = true;
+                eqBinaryPredicate2.getChild(0);
+                result = eqT2Slot2;
+                eqBinaryPredicate2.getChild(1);
+                result = eqT3Slot3;
+                eqT3Slot3.isBoundByTupleIds(tupleIds3);
+                result = true;
+                eqBinaryPredicate3.getChild(0);
+                result = eqT1Slot1;
+                eqBinaryPredicate3.getChild(1);
+                result = eqT3Slot3;
+                scanNode1.getTupleIds();
+                result = tupleIds1;
+                scanNode2.getTupleIds();
+                result = tupleIds2;
+                scanNode3.getTupleIds();
+                result = tupleId3;
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+                tableRef3.getUniqueAlias();
+                result = "t3";
+                tableRef1.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef2.getJoinOp();
+                result = JoinOperator.RIGHT_OUTER_JOIN;
+                tableRef3.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        PlanNode cheapestJoinNode = Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(true, cheapestJoinNode instanceof HashJoinNode);
+        Assert.assertTrue(((HashJoinNode) cheapestJoinNode).getJoinOp().isInnerJoin());
+        Assert.assertEquals(true, cheapestJoinNode.getChild(0) instanceof HashJoinNode);
+        HashJoinNode child0 = (HashJoinNode) cheapestJoinNode.getChild(0);
+        Assert.assertTrue(child0.getJoinOp().isOuterJoin());
+        Assert.assertEquals(2, child0.getChildren().size());
+        Assert.assertEquals(scanNode1, child0.getChild(0));
+        Assert.assertEquals(scanNode2, child0.getChild(1));
+        Assert.assertEquals(scanNode3, cheapestJoinNode.getChild(1));
+    }
+
+    /*
+    Query: select * from t1,t2 right join t3,t5,t4 left join t6,t7
+    Expect: keep t3, t6 position
+            t2, t1 right join t3, t4,t5 left join t6,t7
+     */
+    @Test
+    public void testKeepMultiOuterJoin(@Injectable PlannerContext context,
+                                       @Injectable Analyzer analyzer,
+                                       @Injectable BaseTableRef tableRef1, @Injectable OlapScanNode scanNode1,
+                                       @Injectable BaseTableRef tableRef2, @Injectable OlapScanNode scanNode2,
+                                       @Injectable BaseTableRef tableRef3, @Injectable OlapScanNode scanNode3,
+                                       @Injectable BaseTableRef tableRef4, @Injectable OlapScanNode scanNode4,
+                                       @Injectable BaseTableRef tableRef5, @Injectable OlapScanNode scanNode5,
+                                       @Injectable BaseTableRef tableRef6, @Injectable OlapScanNode scanNode6,
+                                       @Injectable BaseTableRef tableRef7, @Injectable OlapScanNode scanNode7,
+                                       @Injectable TupleDescriptor tupleDescriptor1,
+                                       @Injectable TupleDescriptor tupleDescriptor2,
+                                       @Injectable TupleDescriptor tupleDescriptor3,
+                                       @Injectable TupleDescriptor tupleDescriptor4,
+                                       @Injectable TupleDescriptor tupleDescriptor5,
+                                       @Injectable TupleDescriptor tupleDescriptor6,
+                                       @Injectable TupleDescriptor tupleDescriptor7,
+                                       @Injectable SlotDescriptor slotDescriptor1,
+                                       @Injectable SlotDescriptor slotDescriptor2,
+                                       @Injectable SlotDescriptor slotDescriptor3,
+                                       @Injectable SlotDescriptor slotDescriptor4,
+                                       @Injectable SlotDescriptor slotDescriptor5,
+                                       @Injectable SlotDescriptor slotDescriptor6,
+                                       @Injectable SlotDescriptor slotDescriptor7,
+                                       @Injectable BinaryPredicate eqBinaryPredicate1,
+                                       @Injectable BinaryPredicate eqBinaryPredicate2,
+                                       @Injectable BinaryPredicate eqBinaryPredicate3,
+                                       @Injectable BinaryPredicate eqBinaryPredicate4,
+                                       @Injectable BinaryPredicate eqBinaryPredicate5,
+                                       @Injectable BinaryPredicate eqBinaryPredicate6,
+                                       @Injectable BinaryPredicate eqBinaryPredicate7,
+                                       @Injectable SlotRef eqT1Slot1,
+                                       @Injectable SlotRef eqT2Slot2,
+                                       @Injectable SlotRef eqT3Slot3,
+                                       @Injectable SlotRef eqT4Slot4,
+                                       @Injectable SlotRef eqT5Slot5,
+                                       @Injectable SlotRef eqT6Slot6,
+                                       @Injectable SlotRef eqT7Slot7,
+                                       @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        Pair<TableRef, PlanNode> pair3 = new Pair<>(tableRef3, scanNode3);
+        Pair<TableRef, PlanNode> pair4 = new Pair<>(tableRef4, scanNode4);
+        Pair<TableRef, PlanNode> pair5 = new Pair<>(tableRef5, scanNode5);
+        Pair<TableRef, PlanNode> pair6 = new Pair<>(tableRef6, scanNode6);
+        Pair<TableRef, PlanNode> pair7 = new Pair<>(tableRef7, scanNode7);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair1);
+        refPlans.add(pair2);
+        refPlans.add(pair3);
+        refPlans.add(pair5);
+        refPlans.add(pair4);
+        refPlans.add(pair6);
+        refPlans.add(pair7);
+
+        TupleId tupleId1 = new TupleId(1);
+        TupleId tupleId2 = new TupleId(2);
+        TupleId tupleId3 = new TupleId(3);
+        TupleId tupleId4 = new TupleId(4);
+        TupleId tupleId5 = new TupleId(5);
+        TupleId tupleId6 = new TupleId(6);
+        TupleId tupleId7 = new TupleId(7);
+        List<TupleId> tupleIds1 = Lists.newArrayList(tupleId1);
+        List<TupleId> tupleIds2 = Lists.newArrayList(tupleId2);
+        List<TupleId> tupleIds3 = Lists.newArrayList(tupleId3);
+        List<TupleId> tupleIds4 = Lists.newArrayList(tupleId4);
+        List<TupleId> tupleIds5 = Lists.newArrayList(tupleId5);
+        List<TupleId> tupleIds6 = Lists.newArrayList(tupleId6);
+        List<TupleId> tupleIds7 = Lists.newArrayList(tupleId7);
+        List<TupleId> tupleIds213 = Lists.newArrayList(tupleId2, tupleId1, tupleId3);
+        List<TupleId> tupleIds21345 = new ArrayList<>();
+        tupleIds21345.addAll(tupleIds213);
+        tupleIds21345.add(tupleId4);
+        tupleIds21345.add(tupleId5);
+        List<TupleId> tupleIds213456 = Lists.newArrayList(tupleId2, tupleId1, tupleId3, tupleId4, tupleId5, tupleId6);
+
+        List<SlotDescriptor> slotDescriptors1 = Lists.newArrayList();
+        slotDescriptors1.add(slotDescriptor1);
+        List<SlotDescriptor> slotDescriptors2 = Lists.newArrayList();
+        slotDescriptors2.add(slotDescriptor2);
+        List<Expr> eqConjuncts1 = Lists.newArrayList();
+        eqConjuncts1.add(eqBinaryPredicate1);
+        List<Expr> eqConjuncts2 = Lists.newArrayList();
+        eqConjuncts2.add(eqBinaryPredicate2);
+        List<Expr> eqConjuncts3 = Lists.newArrayList();
+        eqConjuncts3.add(eqBinaryPredicate3);
+
+
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                tableRef2.isAnalyzed();
+                result = true;
+                tableRef4.isAnalyzed();
+                result = true;
+                tableRef5.isAnalyzed();
+                result = true;
+                tableRef7.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                scanNode3.getCardinality();
+                result = 3;
+                scanNode4.getCardinality();
+                result = 4;
+                scanNode5.getCardinality();
+                result = 5;
+                scanNode6.getCardinality();
+                result = 6;
+                scanNode7.getCardinality();
+                result = 7;
+                tableRef1.getDesc();
+                result = tupleDescriptor1;
+                tupleDescriptor1.getMaterializedSlots();
+                result = slotDescriptors1;
+                tableRef2.getDesc();
+                result = tupleDescriptor2;
+                tupleDescriptor2.getMaterializedSlots();
+                result = slotDescriptors2;
+                tableRef3.getDesc();
+                result = tupleDescriptor3;
+                tupleDescriptor3.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor3);
+                tableRef4.getDesc();
+                result = tupleDescriptor4;
+                tupleDescriptor4.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor4);
+                tableRef5.getDesc();
+                result = tupleDescriptor5;
+                tupleDescriptor5.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor5);
+                tableRef6.getDesc();
+                result = tupleDescriptor6;
+                tupleDescriptor6.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor6);
+                tableRef7.getDesc();
+                result = tupleDescriptor7;
+                tupleDescriptor7.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor7);
+                analyzer.getEqJoinConjuncts(tupleIds7, tupleIds1);
+                result = eqConjuncts1;
+                eqBinaryPredicate1.getChild(0);
+                result = eqT7Slot7;
+                eqT7Slot7.isBoundByTupleIds(tupleIds7);
+                result = true;
+                eqBinaryPredicate1.getChild(1);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds1);
+                result = true;
+                analyzer.getEqJoinConjuncts(Lists.newArrayList(tupleId2, tupleId1), tupleIds3);
+                result = eqConjuncts2;
+                analyzer.getEqJoinConjuncts(tupleIds213, tupleIds5);
+                result = Lists.newArrayList(eqBinaryPredicate4);
+                analyzer.getEqJoinConjuncts(tupleIds213, tupleIds4);
+                result = Lists.newArrayList(eqBinaryPredicate5);
+                analyzer.getEqJoinConjuncts(tupleIds21345, tupleIds6);
+                result = Lists.newArrayList(eqBinaryPredicate6);
+                eqBinaryPredicate6.getChild(0);
+                result = eqT5Slot5;
+                eqBinaryPredicate6.getChild(1);
+                result = eqT6Slot6;
+                eqT5Slot5.isBoundByTupleIds(tupleIds21345);
+                result = true;
+                eqT6Slot6.isBoundByTupleIds(tupleIds6);
+                result = true;
+                analyzer.getEqJoinConjuncts(tupleIds213456, tupleIds7);
+                result = Lists.newArrayList(eqBinaryPredicate7);
+                eqBinaryPredicate7.getChild(0);
+                result = eqT6Slot6;
+                eqBinaryPredicate7.getChild(1);
+                result = eqT7Slot7;
+                eqT6Slot6.isBoundByTupleIds(tupleIds213456);
+                result = true;
+                eqT7Slot7.isBoundByTupleIds(tupleIds7);
+                result = true;
+                scanNode1.getTblRefIds();
+                result = Lists.newArrayList(tupleIds1);
+                scanNode2.getTblRefIds();
+                result = Lists.newArrayList(tupleIds2);
+                scanNode3.getTblRefIds();
+                result = Lists.newArrayList(tupleIds3);
+                scanNode4.getTblRefIds();
+                result = Lists.newArrayList(tupleId4);
+                scanNode5.getTblRefIds();
+                result = Lists.newArrayList(tupleId5);
+                scanNode6.getTblRefIds();
+                result = Lists.newArrayList(tupleId6);
+                scanNode7.getTblRefIds();
+                result = Lists.newArrayList(tupleId7);
+
+                eqT2Slot2.isBoundByTupleIds(Lists.newArrayList(tupleId2, tupleId1));
+                result = true;
+                eqBinaryPredicate2.getChild(0);
+                result = eqT2Slot2;
+                eqBinaryPredicate2.getChild(1);
+                result = eqT3Slot3;
+                eqT3Slot3.isBoundByTupleIds(tupleIds3);
+                result = true;
+                eqBinaryPredicate4.getChild(0);
+                result = eqT3Slot3;
+                eqBinaryPredicate4.getChild(1);
+                result = eqT5Slot5;
+                eqT3Slot3.isBoundByTupleIds(tupleIds213);
+                result = true;
+                eqT5Slot5.isBoundByTupleIds(Lists.newArrayList(tupleId5));
+                result = true;
+                eqBinaryPredicate5.getChild(0);
+                result = eqT3Slot3;
+                eqBinaryPredicate5.getChild(1);
+                result = eqT4Slot4;
+                eqT4Slot4.isBoundByTupleIds(Lists.newArrayList(tupleId4));
+                result = true;
+                scanNode1.getTupleIds();
+                result = tupleIds1;
+                scanNode2.getTupleIds();
+                result = tupleIds2;
+                scanNode3.getTupleIds();
+                result = tupleIds3;
+                scanNode4.getTupleIds();
+                result = tupleIds4;
+                scanNode5.getTupleIds();
+                result = tupleIds5;
+                scanNode6.getTupleIds();
+                result = tupleIds6;
+                scanNode7.getTupleIds();
+                result = tupleIds7;
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                scanNode3.getOutputSmap();
+                result = null;
+                scanNode4.getOutputSmap();
+                result = null;
+                scanNode5.getOutputSmap();
+                result = null;
+                scanNode6.getOutputSmap();
+                result = null;
+                scanNode7.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+                tableRef3.getUniqueAlias();
+                result = "t3";
+                tableRef4.getUniqueAlias();
+                result = "t4";
+                tableRef5.getUniqueAlias();
+                result = "t5";
+                tableRef6.getUniqueAlias();
+                result = "t6";
+                tableRef7.getUniqueAlias();
+                result = "t7";
+                tableRef1.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef2.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef3.getJoinOp();
+                result = JoinOperator.RIGHT_OUTER_JOIN;
+                tableRef4.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef5.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef6.getJoinOp();
+                result = JoinOperator.LEFT_OUTER_JOIN;
+                tableRef7.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        PlanNode cheapestJoinNode = Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(Lists.newArrayList(tupleId2, tupleId1, tupleId3, tupleId4, tupleId5, tupleId6, tupleId7),
+                cheapestJoinNode.getTupleIds());
+    }
+
+    /*
+    Query: select * from t1, t3, t2, t4 where (all of inner join condition)
+    Original Query: select * from test1, test3, test2, test4
+                where test1.k1=test3.k1 and test3.k2=test2.k2 and test2.k3=test4.k3;
+    Expect: t4(the largest), t2, t3, t1 (without cross join)
+    Round1: (t4 cross t1) pk (t4 cross t3) pk (t4 inner t2) => t4, t2
+    Round2: ([t4,t2] cross t1) pk ([t4,t2] inner t3) => t4, t2, t3
+    Round3: t4, t2, t3, t1 without pk
+    */
+    @Test
+    public void testMultiInnerJoinReorderAvoidCrossJoin(@Injectable PlannerContext context,
+                                          @Injectable Analyzer analyzer,
+                                          @Injectable BaseTableRef tableRef1, @Injectable OlapScanNode scanNode1,
+                                          @Injectable BaseTableRef tableRef2, @Injectable OlapScanNode scanNode2,
+                                          @Injectable BaseTableRef tableRef3, @Injectable OlapScanNode scanNode3,
+                                          @Injectable BaseTableRef tableRef4, @Injectable OlapScanNode scanNode4,
+                                          @Injectable TupleDescriptor tupleDescriptor1,
+                                          @Injectable TupleDescriptor tupleDescriptor2,
+                                          @Injectable TupleDescriptor tupleDescriptor3,
+                                          @Injectable SlotDescriptor slotDescriptor1,
+                                          @Injectable SlotDescriptor slotDescriptor2,
+                                          @Injectable SlotDescriptor slotDescriptor3,
+                                          @Injectable BinaryPredicate eqBinaryPredicate3,
+                                          @Injectable BinaryPredicate eqBinaryPredicate5,
+                                          @Injectable BinaryPredicate eqBinaryPredicate6,
+                                          @Injectable SlotRef eqT1Slot1,
+                                          @Injectable SlotRef eqT2Slot2,
+                                          @Injectable SlotRef eqT3Slot3,
+                                          @Injectable SlotRef eqT4Slot4,
+                                          @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        Pair<TableRef, PlanNode> pair3 = new Pair<>(tableRef3, scanNode3);
+        Pair<TableRef, PlanNode> pair4 = new Pair<>(tableRef4, scanNode4);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair3);
+        refPlans.add(pair2);
+        refPlans.add(pair1);
+        refPlans.add(pair4);
+
+        TupleId tupleId1 = new TupleId(1);
+        TupleId tupleId2 = new TupleId(2);
+        TupleId tupleId3 = new TupleId(3);
+        TupleId tupleId4 = new TupleId(4);
+        List<TupleId> tupleIds1 = Lists.newArrayList(tupleId1);
+        List<TupleId> tupleIds2 = Lists.newArrayList(tupleId2);
+        List<TupleId> tupleIds3 = Lists.newArrayList(tupleId3);
+        List<TupleId> tupleIds4 = Lists.newArrayList(tupleId4);
+        List<TupleId> tupleIds41 = Lists.newArrayList(tupleId4, tupleId1);
+        List<TupleId> tupleIds412 = Lists.newArrayList(tupleId4, tupleId1, tupleId2);
+
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                tableRef2.isAnalyzed();
+                result = true;
+                tableRef3.isAnalyzed();
+                result = true;
+                tableRef4.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                scanNode3.getCardinality();
+                result = 3;
+                scanNode4.getCardinality();
+                result = 4;
+                tableRef1.getDesc();
+                result = tupleDescriptor1;
+                tupleDescriptor1.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor1);
+                tableRef2.getDesc();
+                result = tupleDescriptor2;
+                tupleDescriptor2.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor2);
+                tableRef3.getDesc();
+                result = tupleDescriptor3;
+                tupleDescriptor3.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor3);
+
+                // where t4.k1=t1.k1
+                analyzer.getEqJoinConjuncts(tupleIds4, tupleIds1);
+                result = Lists.newArrayList(eqBinaryPredicate3);
+                eqBinaryPredicate3.getChild(0);
+                result = eqT4Slot4;
+                eqT4Slot4.isBoundByTupleIds(tupleIds4);
+                result = true;
+                eqBinaryPredicate3.getChild(1);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds1);
+                result = true;
+                // where t1.k1=t2.k1
+                analyzer.getEqJoinConjuncts(tupleIds41, tupleIds2);
+                result = Lists.newArrayList(eqBinaryPredicate5);
+                eqBinaryPredicate5.getChild(0);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds41);
+                result = true;
+                eqBinaryPredicate5.getChild(1);
+                result = eqT2Slot2;
+                eqT2Slot2.isBoundByTupleIds(tupleIds2);
+                result = true;
+                // where t2.k1 = t3.k1
+                analyzer.getEqJoinConjuncts(tupleIds412, tupleIds3);
+                result = Lists.newArrayList(eqBinaryPredicate6);
+                eqBinaryPredicate6.getChild(0);
+                result = eqT2Slot2;
+                eqT2Slot2.isBoundByTupleIds(tupleIds412);
+                result = true;
+                eqBinaryPredicate6.getChild(1);
+                result = eqT3Slot3;
+                eqT3Slot3.isBoundByTupleIds(tupleIds3);
+                result = true;
+
+                scanNode1.getTblRefIds();
+                result = tupleIds1;
+                scanNode2.getTblRefIds();
+                result = tupleIds2;
+                scanNode3.getTblRefIds();
+                result = tupleIds3;
+                scanNode4.getTblRefIds();
+                result = tupleIds4;
+
+                scanNode1.getTupleIds();
+                result = tupleIds1;
+                scanNode2.getTupleIds();
+                result = tupleIds2;
+                scanNode3.getTupleIds();
+                result = tupleIds3;
+                scanNode4.getTupleIds();
+                result = tupleIds4;
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                scanNode3.getOutputSmap();
+                result = null;
+                scanNode4.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+                tableRef3.getUniqueAlias();
+                result = "t3";
+                tableRef4.getUniqueAlias();
+                result = "t4";
+                tableRef1.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef2.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef3.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef4.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        PlanNode cheapestJoinNode = Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(Lists.newArrayList(tupleId4, tupleId1, tupleId2, tupleId3),
+                cheapestJoinNode.getTupleIds());
+    }
+
+    /*
+    Query: select * from t3, t2, t1, t4 where (multi inner join condition)
+    Original Query: select * from test3, test2, test1, test4
+                    where test3.k1=test2.k1 and test2.k1=test1.k1 and test1.k1=test4.k1 and test4.k2=test2.k2
+                          and test4.k2=test3.k2 and test3.k3=test1.k3;
+    Expect: same as above
+     */
+    @Test
+    public void testMultiInnerJoinMultiJoinPredicateReorder(@Injectable PlannerContext context,
+                                                            @Injectable Analyzer analyzer,
+                                                            @Injectable BaseTableRef tableRef1, @Injectable OlapScanNode scanNode1,
+                                                            @Injectable BaseTableRef tableRef2, @Injectable OlapScanNode scanNode2,
+                                                            @Injectable BaseTableRef tableRef3, @Injectable OlapScanNode scanNode3,
+                                                            @Injectable BaseTableRef tableRef4, @Injectable OlapScanNode scanNode4,
+                                                            @Injectable TupleDescriptor tupleDescriptor1,
+                                                            @Injectable TupleDescriptor tupleDescriptor2,
+                                                            @Injectable TupleDescriptor tupleDescriptor3,
+                                                            @Injectable TupleDescriptor tupleDescriptor4,
+                                                            @Injectable SlotDescriptor slotDescriptor1,
+                                                            @Injectable SlotDescriptor slotDescriptor2,
+                                                            @Injectable SlotDescriptor slotDescriptor3,
+                                                            @Injectable SlotDescriptor slotDescriptor4,
+                                                            @Injectable BinaryPredicate eqBinaryPredicate1,
+                                                            @Injectable BinaryPredicate eqBinaryPredicate2,
+                                                            @Injectable BinaryPredicate eqBinaryPredicate3,
+                                                            @Injectable BinaryPredicate eqBinaryPredicate4,
+                                                            @Injectable BinaryPredicate eqBinaryPredicate5,
+                                                            @Injectable BinaryPredicate eqBinaryPredicate6,
+                                                            @Injectable SlotRef eqT1Slot1,
+                                                            @Injectable SlotRef eqT2Slot2,
+                                                            @Injectable SlotRef eqT3Slot3,
+                                                            @Injectable SlotRef eqT4Slot4,
+                                                            @Tested ExprSubstitutionMap exprSubstitutionMap) {
+        Pair<TableRef, PlanNode> pair1 = new Pair<>(tableRef1, scanNode1);
+        Pair<TableRef, PlanNode> pair2 = new Pair<>(tableRef2, scanNode2);
+        Pair<TableRef, PlanNode> pair3 = new Pair<>(tableRef3, scanNode3);
+        Pair<TableRef, PlanNode> pair4 = new Pair<>(tableRef4, scanNode4);
+        List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
+        refPlans.add(pair3);
+        refPlans.add(pair2);
+        refPlans.add(pair1);
+        refPlans.add(pair4);
+
+        TupleId tupleId1 = new TupleId(1);
+        TupleId tupleId2 = new TupleId(2);
+        TupleId tupleId3 = new TupleId(3);
+        TupleId tupleId4 = new TupleId(4);
+        List<TupleId> tupleIds1 = Lists.newArrayList(tupleId1);
+        List<TupleId> tupleIds2 = Lists.newArrayList(tupleId2);
+        List<TupleId> tupleIds3 = Lists.newArrayList(tupleId3);
+        List<TupleId> tupleIds4 = Lists.newArrayList(tupleId4);
+        List<TupleId> tupleIds41 = Lists.newArrayList(tupleId4, tupleId1);
+        List<TupleId> tupleIds412 = Lists.newArrayList(tupleId4, tupleId1, tupleId2);
+
+        new Expectations() {
+            {
+                tableRef1.isAnalyzed();
+                result = true;
+                tableRef2.isAnalyzed();
+                result = true;
+                tableRef3.isAnalyzed();
+                result = true;
+                tableRef4.isAnalyzed();
+                result = true;
+                scanNode1.getCardinality();
+                result = 1;
+                scanNode2.getCardinality();
+                result = 2;
+                scanNode3.getCardinality();
+                result = 3;
+                scanNode4.getCardinality();
+                result = 4;
+                tableRef1.getDesc();
+                result = tupleDescriptor1;
+                tupleDescriptor1.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor1);
+                tableRef2.getDesc();
+                result = tupleDescriptor2;
+                tupleDescriptor2.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor2);
+                tableRef3.getDesc();
+                result = tupleDescriptor3;
+                tupleDescriptor3.getMaterializedSlots();
+                result = Lists.newArrayList(slotDescriptor3);
+
+                // where t4.k1=t3.k1
+                analyzer.getEqJoinConjuncts(tupleIds4, tupleIds3);
+                result = Lists.newArrayList(eqBinaryPredicate1);
+                eqBinaryPredicate1.getChild(0);
+                result = eqT4Slot4;
+                eqT4Slot4.isBoundByTupleIds(tupleIds4);
+                result = true;
+                eqBinaryPredicate1.getChild(1);
+                result = eqT3Slot3;
+                eqT3Slot3.isBoundByTupleIds(tupleIds3);
+                result = true;
+                // where t4.k1 = t2.k1
+                analyzer.getEqJoinConjuncts(tupleIds4, tupleIds2);
+                result = Lists.newArrayList(eqBinaryPredicate2);
+                eqBinaryPredicate2.getChild(0);
+                result = eqT4Slot4;
+                eqBinaryPredicate2.getChild(1);
+                result = eqT2Slot2;
+                eqT2Slot2.isBoundByTupleIds(tupleIds2);
+                result = true;
+                // where t4.k1=t1.k1
+                analyzer.getEqJoinConjuncts(tupleIds4, tupleIds1);
+                result = Lists.newArrayList(eqBinaryPredicate3);
+                eqBinaryPredicate3.getChild(0);
+                result = eqT4Slot4;
+                eqBinaryPredicate3.getChild(1);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds1);
+                result = true;
+                // where t1.k1=t3.k1
+                analyzer.getEqJoinConjuncts(tupleIds41, tupleIds3);
+                result = Lists.newArrayList(eqBinaryPredicate4);
+                eqBinaryPredicate4.getChild(0);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds41);
+                result = true;
+                eqBinaryPredicate4.getChild(1);
+                result = eqT3Slot3;
+                // where t1.k1=t2.k1
+                analyzer.getEqJoinConjuncts(tupleIds41, tupleIds2);
+                result = Lists.newArrayList(eqBinaryPredicate5);
+                eqBinaryPredicate5.getChild(0);
+                result = eqT1Slot1;
+                eqT1Slot1.isBoundByTupleIds(tupleIds41);
+                result = true;
+                eqBinaryPredicate5.getChild(1);
+                result = eqT2Slot2;
+                eqT2Slot2.isBoundByTupleIds(tupleIds2);
+                result = true;
+                // where t2.k1 = t3.k1
+                analyzer.getEqJoinConjuncts(tupleIds412, tupleIds3);
+                result = Lists.newArrayList(eqBinaryPredicate6);
+                eqBinaryPredicate6.getChild(0);
+                result = eqT2Slot2;
+                eqT2Slot2.isBoundByTupleIds(tupleIds412);
+                result = true;
+                eqBinaryPredicate6.getChild(1);
+                result = eqT3Slot3;
+                eqT3Slot3.isBoundByTupleIds(tupleIds3);
+                result = true;
+
+                scanNode1.getTblRefIds();
+                result = tupleIds1;
+                scanNode2.getTblRefIds();
+                result = tupleIds2;
+                scanNode3.getTblRefIds();
+                result = tupleIds3;
+                scanNode4.getTblRefIds();
+                result = tupleIds4;
+
+                scanNode1.getTupleIds();
+                result = tupleIds1;
+                scanNode2.getTupleIds();
+                result = tupleIds2;
+                scanNode3.getTupleIds();
+                result = tupleIds3;
+                scanNode4.getTupleIds();
+                result = tupleIds4;
+                scanNode1.getOutputSmap();
+                result = null;
+                scanNode2.getOutputSmap();
+                result = null;
+                scanNode3.getOutputSmap();
+                result = null;
+                scanNode4.getOutputSmap();
+                result = null;
+                tableRef1.getUniqueAlias();
+                result = "t1";
+                tableRef2.getUniqueAlias();
+                result = "t2";
+                tableRef3.getUniqueAlias();
+                result = "t3";
+                tableRef4.getUniqueAlias();
+                result = "t4";
+                tableRef1.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef2.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef3.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+                tableRef4.getJoinOp();
+                result = JoinOperator.INNER_JOIN;
+            }
+        };
+        new MockUp<ExprSubstitutionMap>() {
+            @Mock
+            public ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                               Analyzer analyzer) {
+                return exprSubstitutionMap;
+            }
+
+            @Mock
+            public ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+                return exprSubstitutionMap;
+            }
+        };
+
+        SingleNodePlanner singleNodePlanner = new SingleNodePlanner(context);
+        PlanNode cheapestJoinNode = Deencapsulation.invoke(singleNodePlanner, "createCheapestJoinPlan", analyzer, refPlans);
+        Assert.assertEquals(2, cheapestJoinNode.getChildren().size());
+        Assert.assertEquals(Lists.newArrayList(tupleId4, tupleId1, tupleId2, tupleId3),
+                cheapestJoinNode.getTupleIds());
+    }
+
+    /*
+    Query: select * from t3, t2, t1, t4 where (there is no predicate related to t2)
+    Expect: t4(the largest), t1, t3, t2
+    Round1: (t4,t3) pk (t4,t2) pk (t4,t1) => t4,t1
+    Round2: ([t4,t1],t3) pk ([t4,t1],t2) => t4,t1,t3
+    Round3: t4,t1,t3,t2 without pk
+     */
+    @Test
+    public void testInnerPriorToCrossJoinReorder() {
+
+    }
+
+    /*
+    Query: select * from t3, t2, t1, t4
+    Original Query: select * from test3, test1, test2, test4;
+    Expect: t4(the largest), t1, t2, t3 (from the smallest to the second largest)
+     */
+    @Test
+    public void testMultiCrossJoinReorder() {
+
+    }
+
+    /*
+    Test explicit cross join
+    Query: select * from t3, t2, t1, t4 where ('>', '<' etc predicates)
+    Original Query: select * from test3,test2,test1,test4
+                    where test3.k1>test2.k1 and test2.k2<test1.k2 and test4.k3>=test1.k3;
+    Expect: need same as implicit cross join
+     */
+    @Test
+    public void testExplicitCrossJoinReorder() {
+
     }
 }
diff --git a/fe/fe-core/src/test/resources/log4j2.xml b/fe/fe-core/src/test/resources/log4j2.xml
index 7e0957b..d0e6cba 100644
--- a/fe/fe-core/src/test/resources/log4j2.xml
+++ b/fe/fe-core/src/test/resources/log4j2.xml
@@ -16,7 +16,7 @@
 
     </Appenders>
     <Loggers>
-        <Root level="info">
+        <Root level="debug">
             <AppenderRef ref="Console" />
             <AppenderRef ref="RollingFile" />
         </Root>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org