You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/06/14 11:40:03 UTC

[incubator-doris] branch master updated: [feature](nereids) Plan Translator (#9993)

This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 25b9d6eba2 [feature](nereids) Plan Translator (#9993)
25b9d6eba2 is described below

commit 25b9d6eba24480993941e7b32062279e325dc4fb
Author: Kikyou1997 <33...@users.noreply.github.com>
AuthorDate: Tue Jun 14 19:39:55 2022 +0800

    [feature](nereids) Plan Translator (#9993)
    
    Issue Number: close #9621
    
    Add following physical operator: PhysicalAgg PhysicalSort PhysicalHashJoin
    
    Add basic logic of plan translator
    
    1. add new agg phase enum for nereids
    2. remove the Analyzer from PlanContext.java
    3. implement PlanTranslator::visitPhysicalFilter
---
 .../org/apache/doris/analysis/AggregateInfo.java   |  15 +-
 .../org/apache/doris/analysis/DescriptorTable.java |  10 +
 .../java/org/apache/doris/analysis/SortInfo.java   |  13 +
 .../apache/doris/nereids/PlanOperatorVisitor.java  |  71 +++++
 .../doris/nereids/operators/AbstractOperator.java  |  27 ++
 .../apache/doris/nereids/operators/Operator.java   |   5 +
 .../doris/nereids/operators/OperatorType.java      |   4 +
 .../doris/nereids/operators/plans/AggPhase.java    |  53 ++++
 .../doris/nereids/operators/plans/JoinType.java    |   4 +-
 .../plans/physical/PhysicalAggregation.java        |  87 ++++++
 .../operators/plans/physical/PhysicalFilter.java   |   8 +
 .../operators/plans/physical/PhysicalHashJoin.java |  62 ++++
 .../operators/plans/physical/PhysicalOlapScan.java |  27 +-
 .../operators/plans/physical/PhysicalOperator.java |   2 +
 .../operators/plans/physical/PhysicalProject.java  |   8 +
 .../operators/plans/physical/PhysicalScan.java     |  14 +-
 .../operators/plans/physical/PhysicalSort.java     |  77 +++++
 ...ysicalProperties.java => DistributionSpec.java} |  22 +-
 ...alProperties.java => HashDistributionSpec.java} |  20 +-
 .../{PhysicalProperties.java => OrderKey.java}     |  34 ++-
 .../nereids/properties/PhysicalProperties.java     |   9 +
 ...Properties.java => RandomDistributionDesc.java} |   4 +-
 .../{Slot.java => ExpressionConverter.java}        |  18 +-
 .../trees/expressions/FunctionCallExpression.java  |  62 ++++
 .../doris/nereids/trees/expressions/Slot.java      |  11 +
 .../nereids/trees/expressions/SlotReference.java   |  10 +-
 .../trees/plans/PhysicalPlanTranslator.java        | 322 +++++++++++++++++++++
 .../org/apache/doris/nereids/trees/plans/Plan.java |   1 +
 .../doris/nereids/trees/plans/PlanContext.java     |  73 +++++
 .../nereids/trees/plans/physical/PhysicalPlan.java |   1 +
 .../java/org/apache/doris/nereids/util/Utils.java  |  15 +
 .../org/apache/doris/planner/AggregationNode.java  |  10 +
 .../org/apache/doris/planner/HashJoinNode.java     | 108 +++++--
 33 files changed, 1139 insertions(+), 68 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index d2ea7443e9..298837d11e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -48,6 +48,7 @@ import java.util.List;
  *   SELECT COUNT(*) FROM (SELECT DISTINCT a, b, ..., x, y, ...) GROUP BY x, y, ...
  *
  * The tree structure looks as follows:
+ * <pre>
  * - for non-distinct aggregation:
  *   - aggInfo: contains the original aggregation functions and grouping exprs
  *   - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping
@@ -61,7 +62,7 @@ import java.util.List;
  *     computation (grouping exprs are identical)
  *   - aggInfo.2ndPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate
  *     functions for the phase 2 computation (grouping exprs are identical)
- *
+ * </pre>
  * In general, merging aggregate computations are idempotent; in other words,
  * aggInfo.mergeAggInfo == aggInfo.mergeAggInfo.mergeAggInfo.
  *
@@ -224,6 +225,17 @@ public final class AggregateInfo extends AggregateInfoBase {
         return result;
     }
 
+    /**
+     * Used by new optimizer.
+     */
+    public static AggregateInfo create(
+            ArrayList<Expr> groupingExprs, ArrayList<FunctionCallExpr> aggExprs,
+            TupleDescriptor tupleDesc, TupleDescriptor intermediateTupleDesc, AggPhase phase) {
+        AggregateInfo result = new AggregateInfo(groupingExprs, aggExprs, phase);
+        result.outputTupleDesc = tupleDesc;
+        result.intermediateTupleDesc = intermediateTupleDesc;
+        return result;
+    }
 
     /**
      * estimate if functions contains multi distinct
@@ -856,4 +868,5 @@ public final class AggregateInfo extends AggregateInfoBase {
     public List<Expr> getInputPartitionExprs() {
         return partitionExprs != null ? partitionExprs : groupingExprs;
     }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 526c6e4806..f733e3a9a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -73,6 +73,16 @@ public class DescriptorTable {
         return result;
     }
 
+    /**
+     * Used by new optimizer.
+     */
+    public SlotDescriptor addSlotDescriptor(TupleDescriptor d, int id) {
+        SlotDescriptor result = new SlotDescriptor(new SlotId(id), d);
+        d.addSlot(result);
+        slotDescs.put(result.getId(), result);
+        return result;
+    }
+
     /**
      * Create copy of src with new id. The returned descriptor has its mem layout
      * computed.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
index d57856b08b..fe287e3900 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
@@ -73,6 +73,19 @@ public class SortInfo {
         materializedOrderingExprs = Lists.newArrayList();
     }
 
+    /**
+     * Used by new optimizer.
+     */
+    public SortInfo(List<Expr> orderingExprs,
+                    List<Boolean> isAscOrder,
+                    List<Boolean> nullsFirstParams,
+                    TupleDescriptor sortTupleDesc) {
+        this.orderingExprs = orderingExprs;
+        this.isAscOrder = isAscOrder;
+        this.nullsFirstParams = nullsFirstParams;
+        this.sortTupleDesc = sortTupleDesc;
+    }
+
     /**
      * C'tor for cloning.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java
new file mode 100644
index 0000000000..063f92aae7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+/**
+ * Base class for the processing of logical and physical plan.
+ *
+ * @param <R> Return type of each visit method.
+ * @param <C> Context type.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {
+
+    public abstract R visit(Plan<? extends Plan, ? extends Operator> plan, C context);
+
+    public R visitPhysicalAggregationPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation> aggPlan,
+            C context) {
+        return null;
+    }
+
+    public R visitPhysicalOlapScanPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan> olapScanPlan,
+            C context) {
+        return null;
+    }
+
+    public R visitPhysicalSortPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalSort> sortPlan,
+            C context) {
+        return null;
+    }
+
+    public R visitPhysicalHashJoinPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin> hashJoinPlan,
+            C context) {
+        return null;
+    }
+
+    public R visitPhysicalProject(PhysicalPlan<? extends PhysicalPlan, PhysicalProject> projectPlan,
+            C context) {
+        return null;
+    }
+
+    public R visitPhysicalFilter(PhysicalPlan<? extends PhysicalPlan, PhysicalFilter> filterPlan,
+            C context) {
+        return null;
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java
index 54064a6f2f..137317cbeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.nereids.operators;
 
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.trees.plans.Plan;
+
 import java.util.Objects;
 
 /**
@@ -24,13 +27,37 @@ import java.util.Objects;
  */
 public abstract class AbstractOperator<TYPE extends AbstractOperator<TYPE>> implements Operator<TYPE> {
     protected final OperatorType type;
+    protected final long limited;
 
     public AbstractOperator(OperatorType type) {
         this.type = Objects.requireNonNull(type, "type can not be null");
+        this.limited = -1;
+    }
+
+    public AbstractOperator(OperatorType type, long limited) {
+        this.type = type;
+        this.limited = limited;
     }
 
     @Override
     public OperatorType getType() {
         return type;
     }
+
+    /**
+     * Child operator should overwrite this method.
+     * for example:
+     * <code>
+     *     visitor.visitPhysicalOlapScanPlan(
+     *                 (PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan>) plan, context);
+     * </code>
+     */
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+        return null;
+    }
+
+    public long getLimited() {
+        return limited;
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java
index bb2450ce35..efecb3a201 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java
@@ -17,8 +17,10 @@
 
 package org.apache.doris.nereids.operators;
 
+import org.apache.doris.nereids.PlanOperatorVisitor;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.trees.TreeNode;
+import org.apache.doris.nereids.trees.plans.Plan;
 
 /**
  * interface for all concrete operator.
@@ -27,4 +29,7 @@ public interface Operator<TYPE extends Operator<TYPE>> {
     OperatorType getType();
 
     <NODE_TYPE extends TreeNode> NODE_TYPE toTreeNode(GroupExpression groupExpression);
+
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context);
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
index b506839854..f1a183521f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
@@ -40,6 +40,10 @@ public enum OperatorType {
     PHYSICAL_PROJECT,
     PHYSICAL_FILTER,
     PHYSICAL_BROADCAST_HASH_JOIN,
+    PHYSICAL_AGGREGATION,
+    PHYSICAL_SORT,
+    PHYSICAL_HASH_JOIN,
+    PHYSICAL_EXCHANGE,
 
     // pattern
     ANY,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java
new file mode 100644
index 0000000000..c365b06d89
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+
+/**
+ * Represents different phase of agg and map it to the
+ * enum of agg phase definition of stale optimizer.
+ */
+public enum AggPhase {
+    FIRST("FIRST", AggregateInfo.AggPhase.FIRST),
+    FIRST_MERGE("FIRST_MERGE", AggregateInfo.AggPhase.FIRST_MERGE),
+    SECOND("SECOND", AggregateInfo.AggPhase.SECOND),
+    SECOND_MERGE("SECOND_MERGE", AggregateInfo.AggPhase.SECOND_MERGE);
+
+    private final String name;
+
+    private final AggregateInfo.AggPhase execAggPhase;
+
+    AggPhase(String name, AggregateInfo.AggPhase execAggPhase) {
+        this.name = name;
+        this.execAggPhase = execAggPhase;
+    }
+
+    public boolean isMerge() {
+        return this == FIRST_MERGE || this == SECOND_MERGE;
+    }
+
+    public AggregateInfo.AggPhase toExec() {
+        return this.execAggPhase;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java
index f22634d649..9e2b8d41e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java
@@ -59,7 +59,7 @@ public enum JoinType {
      * @return legacy join type in Doris
      * @throws AnalysisException throw this exception when input join type cannot convert to legacy join type in Doris
      */
-    public static JoinOperator toJoinOperator(JoinType joinType) throws AnalysisException {
+    public static JoinOperator toJoinOperator(JoinType joinType) {
         switch (joinType) {
             case INNER_JOIN:
                 return JoinOperator.INNER_JOIN;
@@ -80,7 +80,7 @@ public enum JoinType {
             case CROSS_JOIN:
                 return JoinOperator.CROSS_JOIN;
             default:
-                throw new AnalysisException("Not support join operator: " + joinType.name());
+                throw new RuntimeException("Unexpected join operator: " + joinType.name());
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java
new file mode 100644
index 0000000000..feb72bd837
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.AggPhase;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+/**
+ * Physical aggregation plan operator.
+ */
+public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan> {
+
+    private final List<Expression> groupByExprList;
+
+    private final List<Expression> aggExprList;
+
+    private final List<Expression> partitionExprList;
+
+    private final AggPhase aggPhase;
+
+    private final boolean usingStream;
+
+    /**
+     * Constructor of PhysicalAggNode.
+     *
+     * @param groupByExprList group by expr list.
+     * @param aggExprList agg expr list.
+     * @param partitionExprList  partition expr list, used for analytic agg.
+     * @param usingStream whether it's stream agg.
+     */
+    public PhysicalAggregation(List<Expression> groupByExprList, List<Expression> aggExprList,
+            List<Expression> partitionExprList, AggPhase aggPhase, boolean usingStream) {
+        super(OperatorType.PHYSICAL_AGGREGATION);
+        this.groupByExprList = groupByExprList;
+        this.aggExprList = aggExprList;
+        this.partitionExprList = partitionExprList;
+        this.aggPhase = aggPhase;
+        this.usingStream = usingStream;
+    }
+
+    public List<Expression> getGroupByExprList() {
+        return groupByExprList;
+    }
+
+    public List<Expression> getAggExprList() {
+        return aggExprList;
+    }
+
+    public AggPhase getAggPhase() {
+        return aggPhase;
+    }
+
+    public boolean isUsingStream() {
+        return usingStream;
+    }
+
+    public List<Expression> getPartitionExprList() {
+        return partitionExprList;
+    }
+
+    @Override
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+        return visitor.visitPhysicalAggregationPlan(
+                (PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation>) plan, context);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java
index e3a8229b90..f62b69e3ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.nereids.operators.plans.physical;
 
+import org.apache.doris.nereids.PlanOperatorVisitor;
 import org.apache.doris.nereids.operators.OperatorType;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 
 import java.util.Objects;
 
@@ -50,4 +52,10 @@ public class PhysicalFilter<INPUT_TYPE extends Plan>
         }
         return "Filter (" + cond + ")";
     }
+
+    @Override
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+        return visitor.visitPhysicalFilter((PhysicalPlan<? extends PhysicalPlan, PhysicalFilter>) plan,
+                context);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java
new file mode 100644
index 0000000000..41785dbab8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+/**
+ * Physical hash join plan operator.
+ */
+public class PhysicalHashJoin extends PhysicalBinaryOperator<PhysicalHashJoin, PhysicalPlan, PhysicalPlan> {
+
+    private final JoinType joinType;
+
+    private final Expression predicate;
+
+    /**
+     * Constructor of PhysicalHashJoinNode.
+     *
+     * @param joinType Which join type, left semi join, inner join...
+     * @param predicate join condition.
+     */
+    public PhysicalHashJoin(JoinType joinType, Expression predicate) {
+        super(OperatorType.PHYSICAL_HASH_JOIN);
+        this.joinType = joinType;
+        this.predicate = predicate;
+    }
+
+    public JoinType getJoinType() {
+        return joinType;
+    }
+
+    public Expression getPredicate() {
+        return predicate;
+    }
+
+    @Override
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+        return visitor.visitPhysicalHashJoinPlan(
+                (PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin>) plan, context);
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java
index 62d9b3babc..c56ae50c81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java
@@ -19,7 +19,10 @@ package org.apache.doris.nereids.operators.plans.physical;
 
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.nereids.PlanOperatorVisitor;
 import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 
 import com.clearspring.analytics.util.Lists;
 import org.apache.commons.lang3.StringUtils;
@@ -34,6 +37,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> {
     private final List<Long> selectedTabletId;
     private final List<Long> selectedPartitionId;
 
+    private final OlapTable olapTable;
+
     /**
      * Constructor for PhysicalOlapScan.
      *
@@ -41,7 +46,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> {
      * @param qualifier table's name
      */
     public PhysicalOlapScan(OlapTable olapTable, List<String> qualifier) {
-        super(OperatorType.PHYSICAL_OLAP_SCAN, olapTable, qualifier);
+        super(OperatorType.PHYSICAL_OLAP_SCAN, qualifier);
+        this.olapTable = olapTable;
         this.selectedIndexId = olapTable.getBaseIndexId();
         this.selectedTabletId = Lists.newArrayList();
         this.selectedPartitionId = olapTable.getPartitionIds();
@@ -62,12 +68,21 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> {
         return selectedPartitionId;
     }
 
+    public OlapTable getTable() {
+        return olapTable;
+    }
+
     @Override
     public String toString() {
-        return "Scan Olap Table " + StringUtils.join(qualifier, ".") + "." + table.getName()
-            + " (selected index id: " + selectedTabletId
-            + ", selected partition ids: " + selectedPartitionId
-            + ", selected tablet ids: " + selectedTabletId
-            + ")";
+        return "Scan Olap Table " + StringUtils.join(qualifier, ".") + "." + olapTable.getName()
+                + " (selected index id: " + selectedTabletId + ", selected partition ids: " + selectedPartitionId
+                + ", selected tablet ids: " + selectedTabletId + ")";
     }
+
+    @Override
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+        return visitor.visitPhysicalOlapScanPlan(
+                (PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan>) plan, context);
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java
index 4982ec342d..dd5aa79a9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java
@@ -28,5 +28,7 @@ import java.util.List;
  * interface for all concrete physical operator.
  */
 public interface PhysicalOperator<TYPE extends PhysicalOperator<TYPE>> extends PlanOperator<TYPE> {
+
     List<Slot> computeOutputs(LogicalProperties logicalProperties, Plan... inputs);
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java
index bd74fc813b..d5046083bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.nereids.operators.plans.physical;
 
+import org.apache.doris.nereids.PlanOperatorVisitor;
 import org.apache.doris.nereids.operators.OperatorType;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -47,4 +49,10 @@ public class PhysicalProject<INPUT_TYPE extends Plan>
     public String toString() {
         return "Project (" + StringUtils.join(projects, ", ") + ")";
     }
+
+    @Override
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+        return visitor.visitPhysicalProject(
+                (PhysicalPlan<? extends PhysicalPlan, PhysicalProject>) plan, context);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java
index 77ce5570b2..10d434c7e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.nereids.operators.plans.physical;
 
-import org.apache.doris.catalog.Table;
 import org.apache.doris.nereids.operators.OperatorType;
 
 import java.util.List;
@@ -26,26 +25,19 @@ import java.util.Objects;
 /**
  * Abstract class for all physical scan operator.
  */
-public abstract class PhysicalScan<TYPE extends PhysicalScan<TYPE>>
-        extends PhysicalLeafOperator<TYPE> {
+public abstract class PhysicalScan<TYPE extends PhysicalScan<TYPE>> extends PhysicalLeafOperator<TYPE> {
+
 
-    protected final Table table;
     protected final List<String> qualifier;
 
     /**
      * Constructor for PhysicalScan.
      *
      * @param type node type
-     * @param table scan table
      * @param qualifier table's name
      */
-    public PhysicalScan(OperatorType type, Table table, List<String> qualifier) {
+    public PhysicalScan(OperatorType type, List<String> qualifier) {
         super(type);
-        this.table = Objects.requireNonNull(table, "table can not be null");
         this.qualifier = Objects.requireNonNull(qualifier, "qualifier can not be null");
     }
-
-    public Table getTable() {
-        return table;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java
new file mode 100644
index 0000000000..1dcb3d6540
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+/**
+ * Physical sort plan operator.
+ */
+public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> {
+
+    private final int offset;
+
+    private final int limit;
+
+    private final List<OrderKey> orderList;
+
+    private final boolean useTopN;
+
+    /**
+     * Constructor of PhysicalHashJoinNode.
+     */
+    public PhysicalSort(int offset, int limit, List<OrderKey> orderList, boolean useTopN) {
+        super(OperatorType.PHYSICAL_SORT);
+        this.offset = offset;
+        this.limit = limit;
+        this.orderList = orderList;
+        this.useTopN = useTopN;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    public int getLimit() {
+        return limit;
+    }
+
+    public List<OrderKey> getOrderList() {
+        return orderList;
+    }
+
+    public boolean isUseTopN() {
+        return useTopN;
+    }
+
+    public boolean hasLimit() {
+        return limit > -1;
+    }
+
+    @Override
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+        return visitor.visitPhysicalSortPlan((PhysicalPlan<? extends PhysicalPlan, PhysicalSort>) plan,
+                context);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
similarity index 62%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
index fe0db693a5..4122de6901 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
@@ -17,9 +17,27 @@
 
 package org.apache.doris.nereids.properties;
 
+import org.apache.doris.planner.DataPartition;
+
 /**
- * Physical properties used in cascades.
+ * Base class for data distribution.
  */
-public class PhysicalProperties {
+public class DistributionSpec {
+
+    private DataPartition dataPartition;
+
+    public DistributionSpec() {
+    }
+
+    public DistributionSpec(DataPartition dataPartition) {
+        this.dataPartition = dataPartition;
+    }
+
+    public DataPartition getDataPartition() {
+        return dataPartition;
+    }
 
+    public void setDataPartition(DataPartition dataPartition) {
+        this.dataPartition = dataPartition;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
similarity index 68%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
index fe0db693a5..ade20758ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
@@ -17,9 +17,25 @@
 
 package org.apache.doris.nereids.properties;
 
+import org.apache.doris.analysis.HashDistributionDesc;
+
 /**
- * Physical properties used in cascades.
+ * Describe hash distribution.
  */
-public class PhysicalProperties {
+public class HashDistributionSpec extends DistributionSpec {
+
+    /**
+     * Enums for concrete shuffle type.
+     */
+    public enum ShuffleType {
+        COLOCATE,
+        BUCKET,
+        AGG,
+        NORMAL
+    }
+
+    private ShuffleType shuffleType;
+
+    private HashDistributionDesc hashDistributionDesc;
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
similarity index 54%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
index fe0db693a5..b01862dff6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
@@ -17,9 +17,39 @@
 
 package org.apache.doris.nereids.properties;
 
+import org.apache.doris.nereids.trees.expressions.Expression;
+
 /**
- * Physical properties used in cascades.
+ * Represents the order key of a statement.
  */
-public class PhysicalProperties {
+public class OrderKey {
+
+    private Expression expr;
+
+    private boolean isAsc;
+
+    private boolean nullFirst;
+
+    /**
+     * Constructor of OrderKey.
+     *
+     * @param nullFirst True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
+     */
+    public OrderKey(Expression expr, boolean isAsc, boolean nullFirst) {
+        this.expr = expr;
+        this.isAsc = isAsc;
+        this.nullFirst = nullFirst;
+    }
+
+    public Expression getExpr() {
+        return expr;
+    }
+
+    public boolean isAsc() {
+        return isAsc;
+    }
 
+    public boolean isNullFirst() {
+        return nullFirst;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index fe0db693a5..abe767c40e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -22,4 +22,13 @@ package org.apache.doris.nereids.properties;
  */
 public class PhysicalProperties {
 
+    private DistributionSpec distributionDesc;
+
+    public DistributionSpec getDistributionDesc() {
+        return distributionDesc;
+    }
+
+    public void setDistributionDesc(DistributionSpec distributionDesc) {
+        this.distributionDesc = distributionDesc;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
similarity index 89%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
index fe0db693a5..f54ed12a19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
@@ -18,8 +18,8 @@
 package org.apache.doris.nereids.properties;
 
 /**
- * Physical properties used in cascades.
+ * Describe random distribution.
  */
-public class PhysicalProperties {
+public class RandomDistributionDesc extends DistributionSpec {
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java
similarity index 67%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java
index 16d23ffd98..40016180f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java
@@ -17,20 +17,18 @@
 
 package org.apache.doris.nereids.trees.expressions;
 
-import org.apache.doris.nereids.trees.NodeType;
+import org.apache.doris.analysis.Expr;
 
 /**
- * Abstract class for all slot in expression.
+ * Used to convert expression of new optimizer to stale expr.
  */
-public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpression<EXPR_TYPE>
-        implements LeafExpression<EXPR_TYPE> {
+public class ExpressionConverter {
 
-    public Slot(NodeType type) {
-        super(type);
-    }
+    public static ExpressionConverter converter = new ExpressionConverter();
 
-    @Override
-    public Slot toSlot() {
-        return this;
+    // TODO: implement this, besides if expression is a slot, should set the slotId to
+    //       converted the org.apache.doris.analysis.Expr
+    public Expr convert(Expression expression) {
+        return null;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java
new file mode 100644
index 0000000000..88e4bc6ba5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions;
+
+import org.apache.doris.analysis.FunctionName;
+import org.apache.doris.catalog.Function;
+import org.apache.doris.nereids.trees.NodeType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Temp definition of FunctionCallExpression.
+ */
+public class FunctionCallExpression extends Expression<FunctionCallExpression> {
+
+    private FunctionName functionName;
+
+    private List<Expression> params;
+
+    private Function fn;
+
+    /**
+     * Constructor of FunctionCallExpression.
+     */
+    public FunctionCallExpression(FunctionName functionName,
+                                  Function fn, Expression... children) {
+        super(NodeType.EXPRESSION, children);
+        this.functionName = functionName;
+        this.params = Arrays.stream(children).collect(Collectors.toList());
+        this.fn = fn;
+    }
+
+    public FunctionName getFunctionName() {
+        return functionName;
+    }
+
+    public List<Expression> getParams() {
+        return params;
+    }
+
+    public Function getFn() {
+        return fn;
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
index 16d23ffd98..3143b807f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
@@ -25,6 +25,13 @@ import org.apache.doris.nereids.trees.NodeType;
 public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpression<EXPR_TYPE>
         implements LeafExpression<EXPR_TYPE> {
 
+    private int id;
+
+    public Slot(NodeType type, int id, Expression... children) {
+        super(type, children);
+        this.id = id;
+    }
+
     public Slot(NodeType type) {
         super(type);
     }
@@ -33,4 +40,8 @@ public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpre
     public Slot toSlot() {
         return this;
     }
+
+    public int getId() {
+        return id;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
index f779b9eaba..acecad51ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
@@ -18,7 +18,6 @@
 package org.apache.doris.nereids.trees.expressions;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.nereids.exceptions.UnboundException;
 import org.apache.doris.nereids.trees.NodeType;
 import org.apache.doris.nereids.types.DataType;
 
@@ -80,12 +79,12 @@ public class SlotReference extends Slot<SlotReference> {
     }
 
     @Override
-    public DataType getDataType() throws UnboundException {
+    public DataType getDataType() {
         return dataType;
     }
 
     @Override
-    public boolean nullable() throws UnboundException {
+    public boolean nullable() {
         return nullable;
     }
 
@@ -123,4 +122,9 @@ public class SlotReference extends Slot<SlotReference> {
     public int hashCode() {
         return Objects.hash(exprId, name, qualifier, nullable);
     }
+
+    // TODO: return real org.apache.doris.catalog.Column
+    public Column getColumn() {
+        return null;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java
new file mode 100644
index 0000000000..3c4384b671
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Used to translate to physical plan generated by new optimizer to the plan fragments.
+ */
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(Plan<? extends Plan, ? extends Operator> plan, PlanContext context) {
+        PhysicalOperator<?> operator = (PhysicalOperator<?>) plan.getOperator();
+        return operator.accept(this, plan, context);
+    }
+
+    /**
+     * Translate in following steps:
+     *  1.
+     *
+     */
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation> aggPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) aggPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = aggPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) aggPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase().toExec();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST);
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE);
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                break;
+            default:
+                throw new RuntimeException("Unsupported yet");
+        }
+        inputPlanFragment.setPlanRoot(aggregationNode);
+        return inputPlanFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalOlapScanPlan(
+            PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan> olapScanPlan, PlanContext context) {
+        // Create OlapScanNode
+        List<Slot> slotList = olapScanPlan.getOutput();
+        PhysicalOlapScan physicalOlapScan = olapScanPlan.getOperator();
+        OlapTable olapTable = physicalOlapScan.getTable();
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable);
+        OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName());
+        // Create PlanFragment
+        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM);
+        context.addPlanFragment(planFragment);
+        return planFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalSortPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalSort> sortPlan,
+            PlanContext context) {
+        PlanFragment childFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) sortPlan.child(0), context);
+        PhysicalSort physicalSort = sortPlan.getOperator();
+        if (!childFragment.isPartitioned()) {
+            return childFragment;
+        }
+        long limit = physicalSort.getLimit();
+
+        List<Expr> execOrderingExprList = Lists.newArrayList();
+        List<Boolean> ascOrderList = Lists.newArrayList();
+        List<Boolean> nullsFirstParamList = Lists.newArrayList();
+
+        List<OrderKey> orderKeyList = physicalSort.getOrderList();
+        orderKeyList.forEach(k -> {
+            execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr()));
+            ascOrderList.add(k.isAsc());
+            nullsFirstParamList.add(k.isNullFirst());
+        });
+
+        List<Slot> outputList = sortPlan.getOutput();
+        TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null);
+        SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
+
+        PlanNode childNode = childFragment.getPlanRoot();
+        SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(),
+                physicalSort.hasLimit(), physicalSort.getOffset());
+
+        PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context);
+        ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
+        exchNode.unsetLimit();
+        if (physicalSort.hasLimit()) {
+            exchNode.setLimit(limit);
+        }
+        long offset = physicalSort.getOffset();
+        exchNode.setMergeInfo(sortNode.getSortInfo(), offset);
+
+        // Child nodes should not process the offset. If there is a limit,
+        // the child nodes need only return (offset + limit) rows.
+        SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
+        Preconditions.checkState(sortNode == childSortNode);
+        if (sortNode.hasLimit()) {
+            childSortNode.unsetLimit();
+            childSortNode.setLimit(limit + offset);
+        }
+        childSortNode.setOffset(0);
+        return mergeFragment;
+    }
+
+    // TODO: support broadcast join / co-locate / bucket shuffle join later
+    @Override
+    public PlanFragment visitPhysicalHashJoinPlan(
+            PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin> hashJoinPlan, PlanContext context) {
+        PlanFragment leftFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) hashJoinPlan.child(0), context);
+        PlanFragment rightFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) hashJoinPlan.child(0), context);
+        PhysicalHashJoin physicalHashJoin = hashJoinPlan.getOperator();
+        Expression predicateExpr = physicalHashJoin.getPredicate();
+        List<Expression> eqExprList = Utils.getEqConjuncts(hashJoinPlan.child(0).getOutput(),
+                hashJoinPlan.child(1).getOutput(), predicateExpr);
+        JoinType joinType = physicalHashJoin.getJoinType();
+
+        PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
+        PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
+
+        if (joinType.equals(JoinType.CROSS_JOIN)
+                || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) {
+            CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(),
+                    rightFragment.getPlanRoot(), null);
+            crossJoinNode.setLimit(physicalHashJoin.getLimited());
+            List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream()
+                    .map(e -> ExpressionConverter.converter.convert(e))
+                    .collect(Collectors.toCollection(ArrayList::new));
+            crossJoinNode.addConjuncts(conjuncts);
+            ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false);
+            exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+            exchangeNode.setFragment(leftFragment);
+            leftFragmentPlanRoot.setChild(1, exchangeNode);
+            rightFragment.setDestination(exchangeNode);
+            crossJoinNode.setChild(0, leftFragment.getPlanRoot());
+            leftFragment.setPlanRoot(crossJoinNode);
+            return leftFragment;
+        }
+
+        List<Expression> expressionList = Utils.extractConjuncts(predicateExpr);
+        expressionList.removeAll(eqExprList);
+        List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+        List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot,
+                JoinType.toJoinOperator(physicalHashJoin.getJoinType()), execEqConjunctList, execOtherConjunctList);
+
+        ExchangeNode leftExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+        leftExch.setNumInstances(leftFragmentPlanRoot.getNumInstances());
+        ExchangeNode rightExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+        rightExch.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+        hashJoinNode.setChild(0, leftFragmentPlanRoot);
+        hashJoinNode.setChild(1, leftFragmentPlanRoot);
+        hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
+        hashJoinNode.setLimit(physicalHashJoin.getLimited());
+        leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot());
+        rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot);
+        return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition());
+    }
+
+    @Override
+    public PlanFragment visitPhysicalProject(
+            PhysicalPlan<? extends PhysicalPlan, PhysicalProject> projectPlan, PlanContext context) {
+        return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) projectPlan.child(0), context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalFilter(PhysicalPlan<? extends PhysicalPlan, PhysicalFilter> filterPlan,
+            PlanContext context) {
+        PlanFragment inputFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) filterPlan.child(0), context);
+        PlanNode planNode = inputFragment.getPlanRoot();
+        PhysicalFilter filter = filterPlan.getOperator();
+        Expression expression = filter.getPredicates();
+        List<Expression> expressionList = Utils.extractConjuncts(expression);
+        expressionList.stream().map(ExpressionConverter.converter::convert).forEach(planNode::addConjunct);
+        return inputFragment;
+    }
+
+    private TupleDescriptor generateTupleDesc(List<Slot> slotList, PlanContext context, Table table) {
+        TupleDescriptor tupleDescriptor = context.generateTupleDesc();
+        tupleDescriptor.setTable(table);
+        for (Slot slot : slotList) {
+            SlotReference slotReference = (SlotReference) slot;
+            SlotDescriptor slotDescriptor = context.addSlotDesc(tupleDescriptor, slot.getId());
+            slotDescriptor.setColumn(slotReference.getColumn());
+            slotDescriptor.setType(slotReference.getDataType().toCatalogDataType());
+            slotDescriptor.setIsMaterialized(true);
+        }
+        return tupleDescriptor;
+    }
+
+    private PlanFragment createParentFragment(PlanFragment childFragment, DataPartition parentPartition,
+            PlanContext ctx) {
+        ExchangeNode exchangeNode = new ExchangeNode(ctx.nextNodeId(), childFragment.getPlanRoot(), false);
+        exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances());
+        PlanFragment parentFragment = new PlanFragment(ctx.nextFragmentId(), exchangeNode, parentPartition);
+        childFragment.setDestination(exchangeNode);
+        childFragment.setOutputPartition(parentPartition);
+        return parentFragment;
+    }
+
+    /**
+     * Helper function to eliminate unnecessary checked exception caught requirement from the main logic of translator.
+     *
+     * @param f function which would invoke the logic of
+     *        stale code from old optimizer that could throw
+     *        a checked exception
+     */
+    public void exec(FuncWrapper f) {
+        try {
+            f.exec();
+        } catch (Exception e) {
+            throw new RuntimeException("Unexpected Exception: ", e);
+        }
+    }
+
+    private static interface FuncWrapper {
+        void exec() throws Exception;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
index c359a93ed7..77bf7ac625 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
@@ -44,4 +44,5 @@ public interface Plan<
 
     @Override
     Plan child(int index);
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java
new file mode 100644
index 0000000000..fc43e5fa30
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNodeId;
+
+import com.clearspring.analytics.util.Lists;
+
+import java.util.List;
+
+/**
+ * Context of physical plan.
+ */
+public class PlanContext {
+    private List<PlanFragment> planFragmentList = Lists.newArrayList();
+
+    private DescriptorTable descTable = new DescriptorTable();
+
+
+    private final IdGenerator<PlanFragmentId> fragmentIdGenerator = PlanFragmentId.createGenerator();
+
+    private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator();
+
+    public List<PlanFragment> getPlanFragmentList() {
+        return planFragmentList;
+    }
+
+    public TupleDescriptor generateTupleDesc() {
+        return descTable.createTupleDescriptor();
+    }
+
+    public PlanNodeId nextNodeId() {
+        return nodeIdGenerator.getNextId();
+    }
+
+    public SlotDescriptor addSlotDesc(TupleDescriptor t) {
+        return descTable.addSlotDescriptor(t);
+    }
+
+    public SlotDescriptor addSlotDesc(TupleDescriptor t, int id) {
+        return descTable.addSlotDescriptor(t, id);
+    }
+
+    public PlanFragmentId nextFragmentId() {
+        return fragmentIdGenerator.getNextId();
+    }
+
+    public void addPlanFragment(PlanFragment planFragment) {
+        this.planFragmentList.add(planFragment);
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java
index ed03a95033..9301c294e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java
@@ -35,4 +35,5 @@ public interface PhysicalPlan<
 
     @Override
     Plan child(int index);
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
index a01a867593..ff66e62191 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
@@ -17,6 +17,11 @@
 
 package org.apache.doris.nereids.util;
 
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+
+import java.util.List;
+
 /**
  * Utils for Nereids.
  */
@@ -34,4 +39,14 @@ public class Utils {
             return part.replace("`", "``");
         }
     }
+
+    // TODO: implement later
+    public static List<Expression> getEqConjuncts(List<Slot> left, List<Slot> right, Expression eqExpr) {
+        return null;
+    }
+
+    // TODO: implement later
+    public static List<Expression> extractConjuncts(Expression expr) {
+        return null;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index fb16bdebe6..ac7c7fee96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -103,6 +103,16 @@ public class AggregationNode extends PlanNode {
                 && aggInfo.getGroupingExprs().size() > 0;
     }
 
+    // Used by new optimizer
+    public void setNeedsFinalize(boolean needsFinalize) {
+        this.needsFinalize = needsFinalize;
+    }
+
+    // Used by new optimizer
+    public void setUseStreamingPreagg(boolean useStreamingPreagg) {
+        this.useStreamingPreagg = useStreamingPreagg;
+    }
+
     @Override
     public void setCompactData(boolean on) {
         this.compactData = on;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 706a127c0a..0bc86e724a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -67,7 +67,7 @@ import java.util.stream.Collectors;
 public class HashJoinNode extends PlanNode {
     private final static Logger LOG = LogManager.getLogger(HashJoinNode.class);
 
-    private final TableRef     innerRef;
+    private TableRef innerRef;
     private final JoinOperator joinOp;
     // predicates of the form 'a=b' or 'a<=>b'
     private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
@@ -84,8 +84,11 @@ public class HashJoinNode extends PlanNode {
 
     private List<SlotId> hashOutputSlotIds;
 
-    public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef,
-                        List<Expr> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
+    /**
+     * Constructor of HashJoinNode.
+     */
+    public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef, List<Expr> eqJoinConjuncts,
+            List<Expr> otherJoinConjuncts) {
         super(id, "HASH JOIN", NodeType.HASH_JOIN_NODE);
         Preconditions.checkArgument(eqJoinConjuncts != null && !eqJoinConjuncts.isEmpty());
         Preconditions.checkArgument(otherJoinConjuncts != null);
@@ -140,6 +143,63 @@ public class HashJoinNode extends PlanNode {
         }
     }
 
+    /**
+     * This constructor is used by new optimizer.
+     */
+    public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator joinOp, List<Expr> eqJoinConjuncts,
+            List<Expr> otherJoinConjuncts) {
+        super(id, "HASH JOIN", NodeType.HASH_JOIN_NODE);
+        Preconditions.checkArgument(eqJoinConjuncts != null && !eqJoinConjuncts.isEmpty());
+        Preconditions.checkArgument(otherJoinConjuncts != null);
+        tblRefIds.addAll(outer.getTblRefIds());
+        tblRefIds.addAll(inner.getTblRefIds());
+        this.joinOp = joinOp;
+        // TODO: Support not vec exec engine cut unless tupleid in semi/anti join
+        if (VectorizedUtil.isVectorized()) {
+            if (joinOp.equals(JoinOperator.LEFT_ANTI_JOIN) || joinOp.equals(JoinOperator.LEFT_SEMI_JOIN)
+                    || joinOp.equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) {
+                tupleIds.addAll(outer.getTupleIds());
+            } else if (joinOp.equals(JoinOperator.RIGHT_ANTI_JOIN) || joinOp.equals(JoinOperator.RIGHT_SEMI_JOIN)) {
+                tupleIds.addAll(inner.getTupleIds());
+            } else {
+                tupleIds.addAll(outer.getTupleIds());
+                tupleIds.addAll(inner.getTupleIds());
+            }
+        } else {
+            tupleIds.addAll(outer.getTupleIds());
+            tupleIds.addAll(inner.getTupleIds());
+        }
+
+        for (Expr eqJoinPredicate : eqJoinConjuncts) {
+            Preconditions.checkArgument(eqJoinPredicate instanceof BinaryPredicate);
+            BinaryPredicate eqJoin = (BinaryPredicate) eqJoinPredicate;
+            if (eqJoin.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) {
+                Preconditions.checkArgument(eqJoin.getChildren().size() == 2);
+                if (!eqJoin.getChild(0).isNullable() || !eqJoin.getChild(1).isNullable()) {
+                    eqJoin.setOp(BinaryPredicate.Operator.EQ);
+                }
+            }
+            this.eqJoinConjuncts.add(eqJoin);
+        }
+        this.distrMode = DistributionMode.NONE;
+        this.otherJoinConjuncts = otherJoinConjuncts;
+        children.add(outer);
+        children.add(inner);
+
+        // Inherits all the nullable tuple from the children
+        // Mark tuples that form the "nullable" side of the outer join as nullable.
+        nullableTupleIds.addAll(inner.getNullableTupleIds());
+        nullableTupleIds.addAll(outer.getNullableTupleIds());
+        if (joinOp.equals(JoinOperator.FULL_OUTER_JOIN)) {
+            nullableTupleIds.addAll(outer.getTupleIds());
+            nullableTupleIds.addAll(inner.getTupleIds());
+        } else if (joinOp.equals(JoinOperator.LEFT_OUTER_JOIN)) {
+            nullableTupleIds.addAll(inner.getTupleIds());
+        } else if (joinOp.equals(JoinOperator.RIGHT_OUTER_JOIN)) {
+            nullableTupleIds.addAll(outer.getTupleIds());
+        }
+    }
+
     public List<BinaryPredicate> getEqJoinConjuncts() {
         return eqJoinConjuncts;
     }
@@ -176,7 +236,7 @@ public class HashJoinNode extends PlanNode {
     /**
      * Calculate the slots output after going through the hash table in the hash join node.
      * The most essential difference between 'hashOutputSlots' and 'outputSlots' is that
-     *   it's output needs to contain other conjunct and conjunct columns.
+     * it's output needs to contain other conjunct and conjunct columns.
      * hash output slots = output slots + conjunct slots + other conjunct slots
      * For example:
      * select b.k1 from test.t1 a right join test.t1 b on a.k1=b.k1 and b.k2>1 where a.k2>1;
@@ -185,6 +245,7 @@ public class HashJoinNode extends PlanNode {
      * conjuncts: b.k2>1
      * hash output slots: a.k2, b.k2, b.k1
      * eq conjuncts: a.k1=b.k1
+     *
      * @param slotIdList
      */
     private void initHashOutputSlotIds(List<SlotId> slotIdList) {
@@ -204,8 +265,8 @@ public class HashJoinNode extends PlanNode {
         outputSlotIds = Lists.newArrayList();
         for (TupleId tupleId : tupleIds) {
             for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getSlots()) {
-                if (slotDescriptor.isMaterialized()
-                        && (requiredSlotIdSet == null || requiredSlotIdSet.contains(slotDescriptor.getId()))) {
+                if (slotDescriptor.isMaterialized() && (requiredSlotIdSet == null || requiredSlotIdSet.contains(
+                        slotDescriptor.getId()))) {
                     outputSlotIds.add(slotDescriptor.getId());
                 }
             }
@@ -244,13 +305,11 @@ public class HashJoinNode extends PlanNode {
         computeStats(analyzer);
 
         ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap();
-        List<Expr> newEqJoinConjuncts =
-                Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false);
-        eqJoinConjuncts = newEqJoinConjuncts.stream()
-                .map(entity -> (BinaryPredicate) entity).collect(Collectors.toList());
+        List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false);
+        eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity)
+                .collect(Collectors.toList());
         assignedConjuncts = analyzer.getAssignedConjuncts();
-        otherJoinConjuncts =
-                Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false);
+        otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false);
     }
 
     private void replaceOutputSmapForOuterJoin() {
@@ -286,8 +345,7 @@ public class HashJoinNode extends PlanNode {
         private final SlotDescriptor lhs;
         private final SlotDescriptor rhs;
 
-        private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs,
-                                        SlotDescriptor rhs) {
+        private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs, SlotDescriptor rhs) {
             this.eqJoinConjunct = eqJoinConjunct;
             this.lhs = lhs;
             this.rhs = rhs;
@@ -359,8 +417,7 @@ public class HashJoinNode extends PlanNode {
          */
         public static Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> groupByJoinedTupleIds(
                 List<EqJoinConjunctScanSlots> eqJoinConjunctSlots) {
-            Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids =
-                    new LinkedHashMap<>();
+            Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids = new LinkedHashMap<>();
             for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) {
                 Pair<TupleId, TupleId> tids = Pair.create(slots.lhsTid(), slots.rhsTid());
                 List<EqJoinConjunctScanSlots> scanSlots = scanSlotsByJoinedTids.get(tids);
@@ -420,7 +477,8 @@ public class HashJoinNode extends PlanNode {
      * - we adjust the NDVs from both sides to account for predicates that may
      * might have reduce the cardinality and NDVs
      */
-    private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard, long rhsCard) {
+    private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard,
+            long rhsCard) {
         Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin());
         Preconditions.checkState(!eqJoinConjunctSlots.isEmpty());
         Preconditions.checkState(lhsCard >= 0 && rhsCard >= 0);
@@ -531,8 +589,8 @@ public class HashJoinNode extends PlanNode {
             // FK/PK join (which doesn't alter the cardinality of the left-hand side)
             cardinality = getChild(0).cardinality;
         } else {
-            cardinality = Math.round((double) getChild(0).cardinality * (double) getChild(
-                    1).cardinality / (double) maxNumDistinct);
+            cardinality = Math.round(
+                    (double) getChild(0).cardinality * (double) getChild(1).cardinality / (double) maxNumDistinct);
             LOG.debug("lhs card: {}, rhs card: {}", getChild(0).cardinality, getChild(1).cardinality);
         }
         LOG.debug("stats HashJoin: cardinality {}", cardinality);
@@ -585,8 +643,7 @@ public class HashJoinNode extends PlanNode {
 
         // Return -1 if the cardinality of the returned side is unknown.
         long cardinality;
-        if (joinOp == JoinOperator.RIGHT_SEMI_JOIN
-                || joinOp == JoinOperator.RIGHT_ANTI_JOIN) {
+        if (joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == JoinOperator.RIGHT_ANTI_JOIN) {
             if (getChild(1).cardinality == -1) {
                 return -1;
             }
@@ -640,8 +697,8 @@ public class HashJoinNode extends PlanNode {
 
     @Override
     protected String debugString() {
-        return MoreObjects.toStringHelper(this).add("eqJoinConjuncts",
-          eqJoinConjunctsDebugString()).addValue(super.debugString()).toString();
+        return MoreObjects.toStringHelper(this).add("eqJoinConjuncts", eqJoinConjunctsDebugString())
+                .addValue(super.debugString()).toString();
     }
 
     private String eqJoinConjunctsDebugString() {
@@ -750,10 +807,7 @@ public class HashJoinNode extends PlanNode {
     }
 
     public enum DistributionMode {
-        NONE("NONE"),
-        BROADCAST("BROADCAST"),
-        PARTITIONED("PARTITIONED"),
-        BUCKET_SHUFFLE("BUCKET_SHUFFLE");
+        NONE("NONE"), BROADCAST("BROADCAST"), PARTITIONED("PARTITIONED"), BUCKET_SHUFFLE("BUCKET_SHUFFLE");
 
         private final String description;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org