You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/07 10:40:30 UTC

[GitHub] [incubator-doris] Kikyou1997 opened a new pull request, #9993: [feature](nereids) Plan Translator

Kikyou1997 opened a new pull request, #9993:
URL: https://github.com/apache/incubator-doris/pull/9993

   # Proposed changes
   
   Issue Number: close #9621
   
   ## Problem Summary:
   
   Add following physical operator: PhysicalAgg PhysicalSort PhysicalHashJoin
   
   Add basic logic of plan translator
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (No)
   2. Has unit tests been added: (No)
   3. Has document been added or modified: (No)
   4. Does it need to update dependencies: (No)
   5. Are there any changes that cannot be rolled back: (No)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r892602734


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java:
##########
@@ -17,9 +17,20 @@
 
 package org.apache.doris.nereids.properties;
 
+import org.apache.doris.analysis.DistributionDesc;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morrySnow commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
morrySnow commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891119758


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,
+            PlanContext context) {
+        PhysicalOperator<?> operator = physicalPlan.getOperator();
+        return operator.accept(this, physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = physicalPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,

Review Comment:
   do we need exchange node under agg merge node?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangshuo128 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
wangshuo128 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r894130900


##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -38,49 +38,48 @@
  * Encapsulates all the information needed to compute the aggregate functions of a single
  * Select block, including a possible 2nd phase aggregation step for DISTINCT aggregate
  * functions and merge aggregation steps needed for distributed execution.
- *
+ * <p>
  * The latter requires a tree structure of AggregateInfo objects which express the
  * original aggregate computations as well as the necessary merging aggregate
  * computations.
  * TODO: get rid of this by transforming
  *   SELECT COUNT(DISTINCT a, b, ..) GROUP BY x, y, ...
  * into an equivalent query with a inline view:
  *   SELECT COUNT(*) FROM (SELECT DISTINCT a, b, ..., x, y, ...) GROUP BY x, y, ...
- *
+ * <p>
  * The tree structure looks as follows:
  * - for non-distinct aggregation:
- *   - aggInfo: contains the original aggregation functions and grouping exprs
- *   - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping
- *     exprs are identical)
+ * - aggInfo: contains the original aggregation functions and grouping exprs
+ * - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping
+ * exprs are identical)
  * - for distinct aggregation (for an explanation of the phases, see
- *   SelectStmt.createDistinctAggInfo()):
- *   - aggInfo: contains the phase 1 aggregate functions and grouping exprs
- *   - aggInfo.2ndPhaseDistinctAggInfo: contains the phase 2 aggregate functions and
- *     grouping exprs
- *   - aggInfo.mergeAggInfo: contains the merging aggregate functions for the phase 1
- *     computation (grouping exprs are identical)
- *   - aggInfo.2ndPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate
- *     functions for the phase 2 computation (grouping exprs are identical)
- *
+ * SelectStmt.createDistinctAggInfo()):
+ * - aggInfo: contains the phase 1 aggregate functions and grouping exprs
+ * - aggInfo.2ndPhaseDistinctAggInfo: contains the phase 2 aggregate functions and
+ * grouping exprs
+ * - aggInfo.mergeAggInfo: contains the merging aggregate functions for the phase 1
+ * computation (grouping exprs are identical)
+ * - aggInfo.2ndPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate
+ * functions for the phase 2 computation (grouping exprs are identical)
+ * <p>
  * In general, merging aggregate computations are idempotent; in other words,
  * aggInfo.mergeAggInfo == aggInfo.mergeAggInfo.mergeAggInfo.
- *
+ * <p>
  * TODO: move the merge construction logic from SelectStmt into AggregateInfo
  * TODO: Add query tests for aggregation with intermediate tuples with num_nodes=1.
  */
 public final class AggregateInfo extends AggregateInfoBase {
     private final static Logger LOG = LogManager.getLogger(AggregateInfo.class);
 
     public enum AggPhase {
-        FIRST,
-        FIRST_MERGE,
-        SECOND,
-        SECOND_MERGE;
+        FIRST, FIRST_MERGE, SECOND, SECOND_MERGE;

Review Comment:
   using the original code style?



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -414,20 +405,20 @@ public void getRefdSlots(List<SlotId> ids) {
      * Substitute all the expressions (grouping expr, aggregate expr) and update our
      * substitution map according to the given substitution map:
      * - smap typically maps from tuple t1 to tuple t2 (example: the smap of an
-     *   inline view maps the virtual table ref t1 into a base table ref t2)

Review Comment:
   ditto.



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -539,15 +526,14 @@ private Expr createCountDistinctAggExprParam(int firstIdx, int lastIdx,
      * - 'this' is the phase 1 aggregation
      * - grouping exprs are those of the original query (param origGroupingExprs)
      * - aggregate exprs for the DISTINCT agg fns: these are aggregating the grouping
-     *   slots that were added to the original grouping slots in phase 1;

Review Comment:
   ditto.



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -733,8 +707,8 @@ public void createSmaps(Analyzer analyzer) {
      * - The parameters of the sum function may involve the columns of a materialized view.
      * - The type of this column may happen to be inconsistent with the column type of the base table.
      * - In order to ensure the correctness of the result,
-     *   the parameter type needs to be changed to the type of the materialized view column

Review Comment:
   ditto.



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -277,29 +271,27 @@ public static boolean estimateIfContainsMultiDistinct(List<FunctionCallExpr> dis
      * - aggTupleDesc
      * - a complete secondPhaseDistinctAggInfo
      * - mergeAggInfo
-     *
+     * <p>
      * At the moment, we require that all distinct aggregate
      * functions be applied to the same set of exprs (ie, we can't do something
      * like SELECT COUNT(DISTINCT id), COUNT(DISTINCT address)).
      * Aggregation happens in two successive phases:
      * - the first phase aggregates by all grouping exprs plus all parameter exprs
-     *   of DISTINCT aggregate functions
-     *
+     * of DISTINCT aggregate functions
+     * <p>
      * Example:

Review Comment:
   using the original indents?



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -761,10 +735,10 @@ public void updateTypeOfAggregateExprs() {
      * Mark slots required for this aggregation as materialized:
      * - all grouping output slots as well as grouping exprs
      * - for non-distinct aggregation: the aggregate exprs of materialized aggregate slots;
-     *   this assumes that the output slots corresponding to aggregate exprs have already

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#issuecomment-1155066600

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] 924060929 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
924060929 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r893045453


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {
+    public abstract R visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, C context);

Review Comment:
   PhysicalPlan should be Plan.
   ```
   public abstract R visit(Plan<? extends Plan, ? extends PlanOperator> plan, C context);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r894134956


##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -277,29 +271,27 @@ public static boolean estimateIfContainsMultiDistinct(List<FunctionCallExpr> dis
      * - aggTupleDesc
      * - a complete secondPhaseDistinctAggInfo
      * - mergeAggInfo
-     *
+     * <p>
      * At the moment, we require that all distinct aggregate
      * functions be applied to the same set of exprs (ie, we can't do something
      * like SELECT COUNT(DISTINCT id), COUNT(DISTINCT address)).
      * Aggregation happens in two successive phases:
      * - the first phase aggregates by all grouping exprs plus all parameter exprs
-     *   of DISTINCT aggregate functions
-     *
+     * of DISTINCT aggregate functions
+     * <p>
      * Example:

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -38,49 +38,48 @@
  * Encapsulates all the information needed to compute the aggregate functions of a single
  * Select block, including a possible 2nd phase aggregation step for DISTINCT aggregate
  * functions and merge aggregation steps needed for distributed execution.
- *
+ * <p>
  * The latter requires a tree structure of AggregateInfo objects which express the
  * original aggregate computations as well as the necessary merging aggregate
  * computations.
  * TODO: get rid of this by transforming
  *   SELECT COUNT(DISTINCT a, b, ..) GROUP BY x, y, ...
  * into an equivalent query with a inline view:
  *   SELECT COUNT(*) FROM (SELECT DISTINCT a, b, ..., x, y, ...) GROUP BY x, y, ...
- *
+ * <p>
  * The tree structure looks as follows:
  * - for non-distinct aggregation:
- *   - aggInfo: contains the original aggregation functions and grouping exprs
- *   - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping
- *     exprs are identical)
+ * - aggInfo: contains the original aggregation functions and grouping exprs
+ * - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping
+ * exprs are identical)
  * - for distinct aggregation (for an explanation of the phases, see
- *   SelectStmt.createDistinctAggInfo()):
- *   - aggInfo: contains the phase 1 aggregate functions and grouping exprs
- *   - aggInfo.2ndPhaseDistinctAggInfo: contains the phase 2 aggregate functions and
- *     grouping exprs
- *   - aggInfo.mergeAggInfo: contains the merging aggregate functions for the phase 1
- *     computation (grouping exprs are identical)
- *   - aggInfo.2ndPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate
- *     functions for the phase 2 computation (grouping exprs are identical)
- *
+ * SelectStmt.createDistinctAggInfo()):
+ * - aggInfo: contains the phase 1 aggregate functions and grouping exprs
+ * - aggInfo.2ndPhaseDistinctAggInfo: contains the phase 2 aggregate functions and
+ * grouping exprs
+ * - aggInfo.mergeAggInfo: contains the merging aggregate functions for the phase 1
+ * computation (grouping exprs are identical)
+ * - aggInfo.2ndPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate
+ * functions for the phase 2 computation (grouping exprs are identical)
+ * <p>
  * In general, merging aggregate computations are idempotent; in other words,
  * aggInfo.mergeAggInfo == aggInfo.mergeAggInfo.mergeAggInfo.
- *
+ * <p>
  * TODO: move the merge construction logic from SelectStmt into AggregateInfo
  * TODO: Add query tests for aggregation with intermediate tuples with num_nodes=1.
  */
 public final class AggregateInfo extends AggregateInfoBase {
     private final static Logger LOG = LogManager.getLogger(AggregateInfo.class);
 
     public enum AggPhase {
-        FIRST,
-        FIRST_MERGE,
-        SECOND,
-        SECOND_MERGE;
+        FIRST, FIRST_MERGE, SECOND, SECOND_MERGE;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#issuecomment-1155066634

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891855661


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java:
##########
@@ -44,4 +44,6 @@
 
     @Override
     Plan child(int index);
+

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r893072238


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {

Review Comment:
   It's a generic visitor



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java:
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> {
+
+    private int offset;

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java:
##########
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+public class PhysicalHashJoin extends PhysicalBinaryOperator<PhysicalHashJoin, PhysicalPlan, PhysicalPlan> {
+
+    private JoinType joinType;

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.AggPhase;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan> {
+
+    private List<Expression> groupByExprList;

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java:
##########
@@ -33,4 +38,17 @@ public AbstractOperator(OperatorType type) {
     public OperatorType getType() {
         return type;
     }
+
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, PhysicalPlan<?, ?> physicalPlan, C context) {
+        return null;
+    }
+
+    public long getLimited() {
+        return limited;
+    }
+
+    public void setLimited(long limited) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r892077031


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java:
##########
@@ -0,0 +1,49 @@
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNodeId;
+
+import java.util.List;
+
+public class PlanContext {
+    private List<PlanFragment> planFragmentList;
+
+    private Analyzer analyzer;
+
+    private final IdGenerator<PlanFragmentId> fragmentIdGenerator = PlanFragmentId.createGenerator();
+
+    private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator();
+
+    public List<PlanFragment> getPlanFragmentList() {
+        return planFragmentList;
+    }
+
+    public TupleDescriptor generateTupleDesc() {
+        return analyzer.getDescTbl().createTupleDescriptor();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r894134637


##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -38,49 +38,48 @@
  * Encapsulates all the information needed to compute the aggregate functions of a single
  * Select block, including a possible 2nd phase aggregation step for DISTINCT aggregate
  * functions and merge aggregation steps needed for distributed execution.
- *
+ * <p>
  * The latter requires a tree structure of AggregateInfo objects which express the
  * original aggregate computations as well as the necessary merging aggregate
  * computations.
  * TODO: get rid of this by transforming
  *   SELECT COUNT(DISTINCT a, b, ..) GROUP BY x, y, ...
  * into an equivalent query with a inline view:
  *   SELECT COUNT(*) FROM (SELECT DISTINCT a, b, ..., x, y, ...) GROUP BY x, y, ...
- *
+ * <p>
  * The tree structure looks as follows:
  * - for non-distinct aggregation:
- *   - aggInfo: contains the original aggregation functions and grouping exprs

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891852779


##########
fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java:
##########
@@ -64,6 +64,8 @@ public class ExchangeNode extends PlanNode {
     // only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
     private long offset;
 
+    private DataPartition dataPartition;

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r892595594


##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -140,6 +141,60 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef inne
         }
     }
 
+    public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator joinOp, List<Expr> eqJoinConjuncts,

Review Comment:
   > Why this PR include `planner`?
   
   And any other modification to Planner is based on the similiar reason as commented above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] jackwener commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r892465090


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java:
##########
@@ -17,9 +17,20 @@
 
 package org.apache.doris.nereids.properties;
 
+import org.apache.doris.analysis.DistributionDesc;

Review Comment:
   config `style` according to [doc](https://doris.apache.org/zh-CN/developer-guide/java-format-code.html#import-order)



##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -140,6 +141,60 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef inne
         }
     }
 
+    public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator joinOp, List<Expr> eqJoinConjuncts,

Review Comment:
   Why this PR include `planner`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangshuo128 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
wangshuo128 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891161963


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java:
##########
@@ -0,0 +1,49 @@
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNodeId;
+
+import java.util.List;
+
+public class PlanContext {
+    private List<PlanFragment> planFragmentList;
+
+    private Analyzer analyzer;
+
+    private final IdGenerator<PlanFragmentId> fragmentIdGenerator = PlanFragmentId.createGenerator();
+
+    private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator();
+
+    public List<PlanFragment> getPlanFragmentList() {
+        return planFragmentList;
+    }
+
+    public TupleDescriptor generateTupleDesc() {
+        return analyzer.getDescTbl().createTupleDescriptor();

Review Comment:
   `DescTable` should not be placed in `Analyzer` anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangshuo128 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
wangshuo128 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r894128919


##########
fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java:
##########
@@ -38,49 +38,48 @@
  * Encapsulates all the information needed to compute the aggregate functions of a single
  * Select block, including a possible 2nd phase aggregation step for DISTINCT aggregate
  * functions and merge aggregation steps needed for distributed execution.
- *
+ * <p>
  * The latter requires a tree structure of AggregateInfo objects which express the
  * original aggregate computations as well as the necessary merging aggregate
  * computations.
  * TODO: get rid of this by transforming
  *   SELECT COUNT(DISTINCT a, b, ..) GROUP BY x, y, ...
  * into an equivalent query with a inline view:
  *   SELECT COUNT(*) FROM (SELECT DISTINCT a, b, ..., x, y, ...) GROUP BY x, y, ...
- *
+ * <p>
  * The tree structure looks as follows:
  * - for non-distinct aggregation:
- *   - aggInfo: contains the original aggregation functions and grouping exprs

Review Comment:
   We'd better not change the original comment indents.
   You could wrap the comments with the `<pre></pre>` HTML tag, thus the indents wouldn't be modified when formatting the code style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morrySnow commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
morrySnow commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891120514


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,
+            PlanContext context) {
+        PhysicalOperator<?> operator = physicalPlan.getOperator();
+        return operator.accept(this, physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = physicalPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,

Review Comment:
   do we need exchange node under agg merge node?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java:
##########
@@ -44,4 +44,6 @@
 
     @Override
     Plan child(int index);
+

Review Comment:
   blank lines



##########
fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java:
##########
@@ -64,6 +64,8 @@ public class ExchangeNode extends PlanNode {
     // only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
     private long offset;
 
+    private DataPartition dataPartition;

Review Comment:
   why add this?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,
+            PlanContext context) {
+        PhysicalOperator<?> operator = physicalPlan.getOperator();
+        return operator.accept(this, physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = physicalPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                break;
+            default:
+                throw new RuntimeException("Unsupported yet");
+        }
+        inputPlanFragment.setPlanRoot(aggregationNode);
+        return inputPlanFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalOlapScanPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        // Create OlapScanNode
+        List<Slot> slotList = physicalPlan.getOutput();
+        PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator();
+        OlapTable olapTable = physicalOlapScan.getTable();
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable);
+        OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName());
+        // Create PlanFragment
+        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM);
+        context.addPlanFragment(planFragment);
+        return planFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalSortPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        PlanFragment childFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator();
+        if (!childFragment.isPartitioned()) {
+            return childFragment;
+        }
+        long limit = physicalSort.getLimit();
+        long offset = physicalSort.getOffset();
+
+        List<Expr> execOrderingExprList = Lists.newArrayList();
+        List<Boolean> ascOrderList = Lists.newArrayList();
+        List<Boolean> nullsFirstParamList = Lists.newArrayList();
+
+        List<OrderKey> orderKeyList = physicalSort.getOrderList();
+        orderKeyList.forEach(k -> {
+            execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr()));
+            ascOrderList.add(k.isAsc());
+            nullsFirstParamList.add(k.isNullFirst());
+        });
+
+        List<Slot> outputList = physicalPlan.getOutput();
+        TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null);
+        SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
+
+        PlanNode childNode = childFragment.getPlanRoot();
+        SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(),
+                physicalSort.hasLimit(), physicalSort.getOffset());
+
+        PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context);
+        ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
+        exec(() -> {
+            exchNode.init(context.getAnalyzer());
+        });
+        exchNode.unsetLimit();
+        if (physicalSort.hasLimit()) {
+            exchNode.setLimit(limit);
+        }
+        exchNode.setMergeInfo(sortNode.getSortInfo(), offset);
+
+        // Child nodes should not process the offset. If there is a limit,
+        // the child nodes need only return (offset + limit) rows.
+        SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
+        Preconditions.checkState(sortNode == childSortNode);
+        if (sortNode.hasLimit()) {
+            childSortNode.unsetLimit();
+            childSortNode.setLimit(limit + offset);
+        }
+        childSortNode.setOffset(0);
+        return mergeFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalHashJoinPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        PlanFragment leftFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PlanFragment rightFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PhysicalHashJoin physicalHashJoin = (PhysicalHashJoin) physicalPlan.getOperator();
+        Expression predicateExpr = physicalHashJoin.getPredicate();
+        List<Expression> eqExprList = Utils.getEqConjuncts(physicalPlan.child(0).getOutput(),
+                physicalPlan.child(1).getOutput(), predicateExpr);
+        JoinType joinType = physicalHashJoin.getJoinType();
+
+        PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
+        PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
+
+        if (joinType.equals(JoinType.CROSS_JOIN)
+                || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) {
+            CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(),
+                    rightFragment.getPlanRoot(), null);
+            crossJoinNode.setLimit(physicalHashJoin.getLimited());
+            List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream()
+                    .map(e -> ExpressionConverter.converter.convert(e))
+                    .collect(Collectors.toCollection(ArrayList::new));
+            crossJoinNode.addConjuncts(conjuncts);
+            ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false);
+            exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+            exec(() -> {
+                exchangeNode.init(context.getAnalyzer());
+            });
+            exchangeNode.setFragment(leftFragment);
+            leftFragmentPlanRoot.setChild(1, exchangeNode);
+            rightFragment.setDestination(exchangeNode);
+            crossJoinNode.setChild(0, leftFragment.getPlanRoot());
+            leftFragment.setPlanRoot(crossJoinNode);
+            return leftFragment;
+        }
+
+        List<Expression> expressionList = Utils.extractConjuncts(predicateExpr);
+        expressionList.removeAll(eqExprList);
+        List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+        List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot,

Review Comment:
   add a TODO: add broadcast join, etc. later



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,
+            PlanContext context) {
+        PhysicalOperator<?> operator = physicalPlan.getOperator();
+        return operator.accept(this, physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = physicalPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                break;
+            default:
+                throw new RuntimeException("Unsupported yet");
+        }
+        inputPlanFragment.setPlanRoot(aggregationNode);
+        return inputPlanFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalOlapScanPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        // Create OlapScanNode
+        List<Slot> slotList = physicalPlan.getOutput();
+        PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator();
+        OlapTable olapTable = physicalOlapScan.getTable();
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable);
+        OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName());
+        // Create PlanFragment
+        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM);
+        context.addPlanFragment(planFragment);
+        return planFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalSortPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        PlanFragment childFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator();
+        if (!childFragment.isPartitioned()) {

Review Comment:
   why could just return child?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,
+            PlanContext context) {
+        PhysicalOperator<?> operator = physicalPlan.getOperator();
+        return operator.accept(this, physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = physicalPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                break;
+            default:
+                throw new RuntimeException("Unsupported yet");
+        }
+        inputPlanFragment.setPlanRoot(aggregationNode);
+        return inputPlanFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalOlapScanPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        // Create OlapScanNode
+        List<Slot> slotList = physicalPlan.getOutput();
+        PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator();
+        OlapTable olapTable = physicalOlapScan.getTable();
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable);
+        OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName());
+        // Create PlanFragment
+        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM);
+        context.addPlanFragment(planFragment);
+        return planFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalSortPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        PlanFragment childFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator();
+        if (!childFragment.isPartitioned()) {
+            return childFragment;
+        }
+        long limit = physicalSort.getLimit();
+        long offset = physicalSort.getOffset();
+
+        List<Expr> execOrderingExprList = Lists.newArrayList();
+        List<Boolean> ascOrderList = Lists.newArrayList();
+        List<Boolean> nullsFirstParamList = Lists.newArrayList();
+
+        List<OrderKey> orderKeyList = physicalSort.getOrderList();
+        orderKeyList.forEach(k -> {
+            execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr()));
+            ascOrderList.add(k.isAsc());
+            nullsFirstParamList.add(k.isNullFirst());
+        });
+
+        List<Slot> outputList = physicalPlan.getOutput();
+        TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null);
+        SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
+
+        PlanNode childNode = childFragment.getPlanRoot();
+        SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(),
+                physicalSort.hasLimit(), physicalSort.getOffset());
+
+        PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context);
+        ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
+        exec(() -> {
+            exchNode.init(context.getAnalyzer());
+        });
+        exchNode.unsetLimit();
+        if (physicalSort.hasLimit()) {
+            exchNode.setLimit(limit);
+        }
+        exchNode.setMergeInfo(sortNode.getSortInfo(), offset);
+
+        // Child nodes should not process the offset. If there is a limit,
+        // the child nodes need only return (offset + limit) rows.
+        SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
+        Preconditions.checkState(sortNode == childSortNode);
+        if (sortNode.hasLimit()) {
+            childSortNode.unsetLimit();
+            childSortNode.setLimit(limit + offset);
+        }
+        childSortNode.setOffset(0);
+        return mergeFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalHashJoinPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        PlanFragment leftFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PlanFragment rightFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PhysicalHashJoin physicalHashJoin = (PhysicalHashJoin) physicalPlan.getOperator();
+        Expression predicateExpr = physicalHashJoin.getPredicate();
+        List<Expression> eqExprList = Utils.getEqConjuncts(physicalPlan.child(0).getOutput(),
+                physicalPlan.child(1).getOutput(), predicateExpr);
+        JoinType joinType = physicalHashJoin.getJoinType();
+
+        PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
+        PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
+
+        if (joinType.equals(JoinType.CROSS_JOIN)
+                || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) {
+            CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(),
+                    rightFragment.getPlanRoot(), null);
+            crossJoinNode.setLimit(physicalHashJoin.getLimited());
+            List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream()
+                    .map(e -> ExpressionConverter.converter.convert(e))
+                    .collect(Collectors.toCollection(ArrayList::new));
+            crossJoinNode.addConjuncts(conjuncts);
+            ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false);
+            exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+            exec(() -> {
+                exchangeNode.init(context.getAnalyzer());
+            });
+            exchangeNode.setFragment(leftFragment);
+            leftFragmentPlanRoot.setChild(1, exchangeNode);
+            rightFragment.setDestination(exchangeNode);
+            crossJoinNode.setChild(0, leftFragment.getPlanRoot());
+            leftFragment.setPlanRoot(crossJoinNode);
+            return leftFragment;
+        }
+
+        List<Expression> expressionList = Utils.extractConjuncts(predicateExpr);
+        expressionList.removeAll(eqExprList);
+        List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+        List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot,
+                JoinType.toJoinOperator(physicalHashJoin.getJoinType()), execEqConjunctList, execOtherConjunctList);
+
+        ExchangeNode leftExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+        leftExch.setNumInstances(leftFragmentPlanRoot.getNumInstances());
+        ExchangeNode rightExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+        rightExch.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+        exec(() -> {
+            leftExch.init(context.getAnalyzer());
+            rightExch.init(context.getAnalyzer());
+        });
+        hashJoinNode.setChild(0, leftFragmentPlanRoot);
+        hashJoinNode.setChild(1, leftFragmentPlanRoot);
+        hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
+        hashJoinNode.setLimit(physicalHashJoin.getLimited());
+        leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot());
+        rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot);
+        return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition());
+    }
+
+    @Override
+    public PlanFragment visitPhysicalProject(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalFilter(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);

Review Comment:
   add prdicate to child node's conjuncts



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java:
##########
@@ -29,4 +29,6 @@
  */
 public interface PhysicalOperator<TYPE extends PhysicalOperator<TYPE>> extends PlanOperator<TYPE> {
     List<Slot> computeOutputs(LogicalProperties logicalProperties, Plan... inputs);
+

Review Comment:
   blank lines



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> {
+
+    private int offset;
+
+    private int limit = -1;
+
+    private List<OrderKey> orderList;
+
+    // if true, the output of this node feeds an AnalyticNode
+    private boolean isAnalyticSort;

Review Comment:
   current, we do not support analytic functions. In old planner, this is a trick flag. In Nereids, this should no longer exists



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,
+            PlanContext context) {
+        PhysicalOperator<?> operator = physicalPlan.getOperator();
+        return operator.accept(this, physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = physicalPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                break;
+            default:
+                throw new RuntimeException("Unsupported yet");
+        }
+        inputPlanFragment.setPlanRoot(aggregationNode);
+        return inputPlanFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalOlapScanPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        // Create OlapScanNode
+        List<Slot> slotList = physicalPlan.getOutput();
+        PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator();
+        OlapTable olapTable = physicalOlapScan.getTable();
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable);
+        OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName());
+        // Create PlanFragment
+        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM);
+        context.addPlanFragment(planFragment);
+        return planFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalSortPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        PlanFragment childFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator();
+        if (!childFragment.isPartitioned()) {
+            return childFragment;
+        }
+        long limit = physicalSort.getLimit();
+        long offset = physicalSort.getOffset();
+
+        List<Expr> execOrderingExprList = Lists.newArrayList();
+        List<Boolean> ascOrderList = Lists.newArrayList();
+        List<Boolean> nullsFirstParamList = Lists.newArrayList();
+
+        List<OrderKey> orderKeyList = physicalSort.getOrderList();
+        orderKeyList.forEach(k -> {
+            execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr()));
+            ascOrderList.add(k.isAsc());
+            nullsFirstParamList.add(k.isNullFirst());
+        });
+
+        List<Slot> outputList = physicalPlan.getOutput();
+        TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null);
+        SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
+
+        PlanNode childNode = childFragment.getPlanRoot();
+        SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(),
+                physicalSort.hasLimit(), physicalSort.getOffset());
+
+        PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context);
+        ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
+        exec(() -> {
+            exchNode.init(context.getAnalyzer());
+        });
+        exchNode.unsetLimit();
+        if (physicalSort.hasLimit()) {
+            exchNode.setLimit(limit);
+        }
+        exchNode.setMergeInfo(sortNode.getSortInfo(), offset);
+
+        // Child nodes should not process the offset. If there is a limit,
+        // the child nodes need only return (offset + limit) rows.
+        SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
+        Preconditions.checkState(sortNode == childSortNode);
+        if (sortNode.hasLimit()) {
+            childSortNode.unsetLimit();
+            childSortNode.setLimit(limit + offset);
+        }
+        childSortNode.setOffset(0);
+        return mergeFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalHashJoinPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        PlanFragment leftFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PlanFragment rightFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+        PhysicalHashJoin physicalHashJoin = (PhysicalHashJoin) physicalPlan.getOperator();
+        Expression predicateExpr = physicalHashJoin.getPredicate();
+        List<Expression> eqExprList = Utils.getEqConjuncts(physicalPlan.child(0).getOutput(),
+                physicalPlan.child(1).getOutput(), predicateExpr);
+        JoinType joinType = physicalHashJoin.getJoinType();
+
+        PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
+        PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
+
+        if (joinType.equals(JoinType.CROSS_JOIN)
+                || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) {
+            CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(),
+                    rightFragment.getPlanRoot(), null);
+            crossJoinNode.setLimit(physicalHashJoin.getLimited());
+            List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream()
+                    .map(e -> ExpressionConverter.converter.convert(e))
+                    .collect(Collectors.toCollection(ArrayList::new));
+            crossJoinNode.addConjuncts(conjuncts);
+            ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false);
+            exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+            exec(() -> {
+                exchangeNode.init(context.getAnalyzer());
+            });
+            exchangeNode.setFragment(leftFragment);
+            leftFragmentPlanRoot.setChild(1, exchangeNode);
+            rightFragment.setDestination(exchangeNode);
+            crossJoinNode.setChild(0, leftFragment.getPlanRoot());
+            leftFragment.setPlanRoot(crossJoinNode);
+            return leftFragment;
+        }
+
+        List<Expression> expressionList = Utils.extractConjuncts(predicateExpr);
+        expressionList.removeAll(eqExprList);
+        List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+        List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot,
+                JoinType.toJoinOperator(physicalHashJoin.getJoinType()), execEqConjunctList, execOtherConjunctList);
+
+        ExchangeNode leftExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+        leftExch.setNumInstances(leftFragmentPlanRoot.getNumInstances());
+        ExchangeNode rightExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+        rightExch.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+        exec(() -> {
+            leftExch.init(context.getAnalyzer());
+            rightExch.init(context.getAnalyzer());
+        });
+        hashJoinNode.setChild(0, leftFragmentPlanRoot);
+        hashJoinNode.setChild(1, leftFragmentPlanRoot);
+        hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
+        hashJoinNode.setLimit(physicalHashJoin.getLimited());
+        leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot());
+        rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot);
+        return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition());
+    }
+
+    @Override
+    public PlanFragment visitPhysicalProject(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+        return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);

Review Comment:
   we need to discuss how to process this 



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java:
##########
@@ -34,6 +35,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> {
     private final List<Long> selectedTabletId;
     private final List<Long> selectedPartitionId;
 
+    private OlapTable olapTable;

Review Comment:
   why move this from parent class to sub class



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java:
##########
@@ -0,0 +1,82 @@
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan>{
+
+    private List<Expression> groupByExprList;
+
+    private List<Expression> aggExprList;
+
+    private List<Expression> partitionExprList;
+
+    private AggregateInfo.AggPhase aggPhase;

Review Comment:
   maybe a new enum is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891903091


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java:
##########
@@ -29,4 +29,6 @@
  */
 public interface PhysicalOperator<TYPE extends PhysicalOperator<TYPE>> extends PlanOperator<TYPE> {
     List<Slot> computeOutputs(LogicalProperties logicalProperties, Plan... inputs);
+

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r893072065


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {
+    public abstract R visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, C context);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morrySnow commented on pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
morrySnow commented on PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#issuecomment-1148583970

   fix check style~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r892593582


##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -140,6 +141,60 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef inne
         }
     }
 
+    public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator joinOp, List<Expr> eqJoinConjuncts,

Review Comment:
   > Why this PR include `planner`?
   
   `tableRef` is useless in the new optimizer, and we can't get and no need to construct a such object from nereids, so add a new constructor which invoked by PlanTranslator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891913159


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java:
##########
@@ -0,0 +1,82 @@
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan>{
+
+    private List<Expression> groupByExprList;
+
+    private List<Expression> aggExprList;
+
+    private List<Expression> partitionExprList;
+
+    private AggregateInfo.AggPhase aggPhase;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 merged pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 merged PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891867087


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> {
+
+    private int offset;
+
+    private int limit = -1;
+
+    private List<OrderKey> orderList;
+
+    // if true, the output of this node feeds an AnalyticNode
+    private boolean isAnalyticSort;

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r892593582


##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -140,6 +141,60 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef inne
         }
     }
 
+    public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator joinOp, List<Expr> eqJoinConjuncts,

Review Comment:
   > Why this PR include `planner`?
   
   `tableRef` is useless in the new optimizer, and we can't get and no need to a such object from nereids, so add a new constructor which invoked by PlanTranslator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r893072339


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {
+    public abstract R visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, C context);
+
+    public R visitPhysicalAggregationPlan(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java:
##########
@@ -17,13 +17,18 @@
 
 package org.apache.doris.nereids.operators;
 
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
 import java.util.Objects;
 
 /**
  * Abstract class for all concrete operator.
  */
 public abstract class AbstractOperator<TYPE extends AbstractOperator<TYPE>> implements Operator<TYPE> {
     protected final OperatorType type;
+    protected long limited;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] 924060929 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
924060929 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r893038269


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java:
##########
@@ -17,13 +17,18 @@
 
 package org.apache.doris.nereids.operators;
 
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
 import java.util.Objects;
 
 /**
  * Abstract class for all concrete operator.
  */
 public abstract class AbstractOperator<TYPE extends AbstractOperator<TYPE>> implements Operator<TYPE> {
     protected final OperatorType type;
+    protected long limited;

Review Comment:
   ```suggestion
       protected final long limited;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java:
##########
@@ -33,4 +38,17 @@ public AbstractOperator(OperatorType type) {
     public OperatorType getType() {
         return type;
     }
+
+    public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, PhysicalPlan<?, ?> physicalPlan, C context) {
+        return null;
+    }
+
+    public long getLimited() {
+        return limited;
+    }
+
+    public void setLimited(long limited) {

Review Comment:
   Operator and Plan's properties should be immutable. So no setter.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {
+    public abstract R visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, C context);
+
+    public R visitPhysicalAggregationPlan(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,

Review Comment:
   ```suggestion
       public R visitPhysicalAggregation(PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation> aggregationPlan, C context);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.AggPhase;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan> {
+
+    private List<Expression> groupByExprList;

Review Comment:
   operator's properties should be final



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java:
##########
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+public class PhysicalHashJoin extends PhysicalBinaryOperator<PhysicalHashJoin, PhysicalPlan, PhysicalPlan> {
+
+    private JoinType joinType;

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java:
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> {
+
+    private int offset;

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangshuo128 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
wangshuo128 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r893045887


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {

Review Comment:
   Is this class a generic visitor for all the operators, i.e., both logical and physical, or it's a dedicated visitor only for physical operators?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Kikyou1997 commented on a diff in pull request #9993: [feature](nereids) Plan Translator

Posted by GitBox <gi...@apache.org>.
Kikyou1997 commented on code in PR #9993:
URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891852178


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+    public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+            PlanContext context) {
+        visit(physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan,
+            PlanContext context) {
+        PhysicalOperator<?> operator = physicalPlan.getOperator();
+        return operator.accept(this, physicalPlan, context);
+    }
+
+    @Override
+    public PlanFragment visitPhysicalAggregationPlan(
+            PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) {
+
+        PlanFragment inputPlanFragment = visit(
+                (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context);
+
+        AggregationNode aggregationNode = null;
+        List<Slot> slotList = physicalPlan.getOutput();
+        TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+        PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator();
+        AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase();
+
+        List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+        ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+                .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+        // TODO: agg function could be other expr type either
+        ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+        List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+                .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+        // todo: support DISTINCT
+        AggregateInfo aggInfo = null;
+        switch (phase) {
+            case FIRST:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+                        outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer());
+                aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+                aggregationNode.unsetNeedsFinalize();
+                aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+                aggregationNode.setIntermediateTuple();
+                if (!partitionExpressionList.isEmpty()) {
+                    inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+                }
+                break;
+            case FIRST_MERGE:
+                aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,

Review Comment:
   exactly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org