You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/21 04:05:29 UTC

[doris] branch master updated: [enhancement](Nereids) two phase read for topn (#18829)

This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b84bd156fb [enhancement](Nereids) two phase read for topn (#18829)
b84bd156fb is described below

commit b84bd156fb749d454cbfc94ee204dfbdca87b0f0
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Fri Apr 21 12:05:22 2023 +0800

    [enhancement](Nereids) two phase read for topn (#18829)
    
    add two phase read topn opt, the legacy planner's PR are:
    - #15642
    - #16460
    - #16848
    
    TODO:
    we forbid limit(sort(project(scan))) since be core when plan has a project on the scan.
    we need to remove this restirction after we fix be bug
---
 .../org/apache/doris/analysis/SlotDescriptor.java  |   2 +-
 .../glue/translator/PhysicalPlanTranslator.java    | 130 ++++++++++++++++---
 .../glue/translator/PlanTranslatorContext.java     |   4 -
 .../nereids/processor/post/PlanPostProcessors.java |   1 +
 .../doris/nereids/processor/post/TopNScanOpt.java  |  32 +++--
 .../nereids/processor/post/TwoPhaseReadOpt.java    | 144 +++++++++++++++++++++
 .../nereids/trees/expressions/Expression.java      |   4 +
 .../trees/plans/physical/PhysicalOlapScan.java     |   2 +
 .../nereids/trees/plans/physical/PhysicalTopN.java |   5 +-
 .../nereids/postprocess/RuntimeFilterTest.java     |   2 +-
 .../nereids/postprocess/TopNRuntimeFilterTest.java |   4 +-
 .../rules/analysis/AnalyzeSubQueryTest.java        |   2 +-
 .../rules/analysis/AnalyzeWhereSubqueryTest.java   |   2 +-
 .../nereids/rules/analysis/RegisterCTETest.java    |   2 +-
 .../doris/nereids/trees/expressions/ViewTest.java  |   2 +-
 15 files changed, 292 insertions(+), 46 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index cc19f94fa7..bc4c4bc37f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -68,7 +68,7 @@ public class SlotDescriptor {
     private boolean isAgg;
     private boolean isMultiRef;
     // If set to false, then such slots will be ignored during
-    // materialize them.Used to optmize to read less data and less memory usage
+    // materialize them.Used to optimize to read less data and less memory usage
     private boolean needMaterialize = true;
 
     public SlotDescriptor(SlotId id, TupleDescriptor parent) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6a22f11c72..4b80f64036 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -176,21 +176,23 @@ import java.util.stream.Stream;
  * </STRONG>
  */
 public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, PlanTranslatorContext> {
+
     private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class);
-    protected StatsErrorEstimator statsErrorEstimator;
-    PlanTranslatorContext context;
+
+    private final StatsErrorEstimator statsErrorEstimator;
+    private final PlanTranslatorContext context;
 
     public PhysicalPlanTranslator() {
+        this(null, null);
     }
 
-    public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
-        this.context = context;
-        this.statsErrorEstimator = statsErrorEstimator;
+    public PhysicalPlanTranslator(PlanTranslatorContext context) {
+        this(context, null);
     }
 
-    public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) {
+    public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
         this.context = context;
-        return translatePlan(physicalPlan);
+        this.statsErrorEstimator = statsErrorEstimator;
     }
 
     /**
@@ -497,8 +499,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                 .addAll(olapScan.getOutput())
                 .addAll(filterSlotsOfSelectedIndex(olapScan.getNonUserVisibleOutput(), olapScan))
                 .build();
+        Set<ExprId> deferredMaterializedExprIds = Collections.emptySet();
+        if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
+            deferredMaterializedExprIds = (Set<ExprId>) (olapScan
+                    .getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).get());
+        }
         OlapTable olapTable = olapScan.getTable();
-        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, context);
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, deferredMaterializedExprIds, context);
+        if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
+            injectRowIdColumnSlot(tupleDescriptor);
+        }
 
         // Use column with the same name in selected materialized index meta for slot desc,
         // to get the correct col unique id.
@@ -513,7 +523,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                 }
             });
         }
-
         tupleDescriptor.setTable(olapTable);
 
         OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode");
@@ -875,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
             SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
             sortNode.setOffset(topN.getOffset());
             sortNode.setLimit(topN.getLimit());
-            if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
+            if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
                 sortNode.setUseTopnOpt(true);
                 PlanNode child = sortNode.getChild(0);
                 Preconditions.checkArgument(child instanceof OlapScanNode,
@@ -929,6 +938,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
         SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, true);
         sortNode.finalizeForNereids(tupleDesc, sortTupleOutputList, oldOrderingExprList);
+        if (sort.getMutableState(PhysicalTopN.TWO_PHASE_READ_OPT).isPresent()) {
+            sortNode.setUseTwoPhaseReadOpt(true);
+            sortNode.getSortInfo().setUseTwoPhaseRead();
+            injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor());
+            TupleDescriptor childTuple = childNode.getOutputTupleDesc() != null
+                    ? childNode.getOutputTupleDesc() : context.getTupleDesc(childNode.getTupleIds().get(0));
+            SlotDescriptor childRowIdDesc = childTuple.getSlots().get(childTuple.getSlots().size() - 1);
+            sortNode.getResolvedTupleExprs().add(new SlotRef(childRowIdDesc));
+        }
         if (sort.getStats() != null) {
             sortNode.setCardinality((long) sort.getStats().getRowCount());
         }
@@ -1093,7 +1111,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                     continue;
                 }
                 SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
-                SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+                SlotDescriptor sd;
+                if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+                    // TODO: temporary code for two phase read, should remove it after refactor
+                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
+                } else {
+                    sd = context.createSlotDesc(intermediateDescriptor, sf);
+                }
                 leftIntermediateSlotDescriptor.add(sd);
             }
         } else if (hashJoin.getOtherJoinConjuncts().isEmpty()
@@ -1103,7 +1127,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                     continue;
                 }
                 SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
-                SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+                SlotDescriptor sd;
+                if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+                    // TODO: temporary code for two phase read, should remove it after refactor
+                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
+                } else {
+                    sd = context.createSlotDesc(intermediateDescriptor, sf);
+                }
                 rightIntermediateSlotDescriptor.add(sd);
             }
         } else {
@@ -1112,9 +1142,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                     continue;
                 }
                 SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
-                SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
-                if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
-                    hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
+                SlotDescriptor sd;
+                if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+                    // TODO: temporary code for two phase read, should remove it after refactor
+                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
+                } else {
+                    sd = context.createSlotDesc(intermediateDescriptor, sf);
+                    if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
+                        hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
+                    }
                 }
                 leftIntermediateSlotDescriptor.add(sd);
             }
@@ -1123,9 +1159,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                     continue;
                 }
                 SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
-                SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
-                if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
-                    hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
+                SlotDescriptor sd;
+                if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+                    // TODO: temporary code for two phase read, should remove it after refactor
+                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
+                } else {
+                    sd = context.createSlotDesc(intermediateDescriptor, sf);
+                    if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
+                        hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
+                    }
                 }
                 rightIntermediateSlotDescriptor.add(sd);
             }
@@ -1280,7 +1322,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                     continue;
                 }
                 SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
-                SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+                SlotDescriptor sd;
+                if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+                    // TODO: temporary code for two phase read, should remove it after refactor
+                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
+                } else {
+                    sd = context.createSlotDesc(intermediateDescriptor, sf);
+                }
                 leftIntermediateSlotDescriptor.add(sd);
             }
             for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
@@ -1288,7 +1336,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                     continue;
                 }
                 SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
-                SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+                SlotDescriptor sd;
+                if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+                    // TODO: temporary code for two phase read, should remove it after refactor
+                    sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
+                } else {
+                    sd = context.createSlotDesc(intermediateDescriptor, sf);
+                }
                 rightIntermediateSlotDescriptor.add(sd);
             }
 
@@ -1405,6 +1459,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
         inputPlanNode.setProjectList(execExprList);
         inputPlanNode.setOutputTupleDesc(tupleDescriptor);
+        // TODO: this is a temporary scheme to support two phase read when has project.
+        //  we need to refactor all topn opt into rbo stage.
+        if (inputPlanNode instanceof OlapScanNode) {
+            ArrayList<SlotDescriptor> slots = context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots();
+            SlotDescriptor lastSlot = slots.get(slots.size() - 1);
+            if (lastSlot.getColumn() != null && lastSlot.getColumn().getName().equals(Column.ROWID_COL)) {
+                inputPlanNode.getProjectList().add(new SlotRef(lastSlot));
+                injectRowIdColumnSlot(tupleDescriptor);
+                requiredSlotIdSet.add(lastSlot.getId());
+            }
+        }
 
         if (inputPlanNode instanceof ScanNode) {
             updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdSet, requiredByProjectSlotIdSet, context);
@@ -1739,6 +1804,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         }
     }
 
+    private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table,
+            Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext context) {
+        TupleDescriptor tupleDescriptor = context.generateTupleDesc();
+        tupleDescriptor.setTable(table);
+        for (Slot slot : slotList) {
+            SlotDescriptor slotDescriptor = context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
+            if (deferredMaterializedExprIds.contains(slot.getExprId())) {
+                slotDescriptor.setNeedMaterialize(false);
+            }
+        }
+        return tupleDescriptor;
+    }
+
     private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
         TupleDescriptor tupleDescriptor = context.generateTupleDesc();
         tupleDescriptor.setTable(table);
@@ -2153,4 +2231,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
             statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
         }
     }
+
+    private SlotDescriptor injectRowIdColumnSlot(TupleDescriptor tupleDesc) {
+        SlotDescriptor slotDesc = context.addSlotDesc(tupleDesc);
+        LOG.debug("inject slot {}", slotDesc);
+        String name = Column.ROWID_COL;
+        Column col = new Column(name, Type.STRING, false, null, false, "", "rowid column");
+        slotDesc.setType(Type.STRING);
+        slotDesc.setColumn(col);
+        slotDesc.setIsNullable(false);
+        slotDesc.setIsMaterialized(true);
+        return slotDesc;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index b7b7792775..7edb4b6349 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -159,10 +159,6 @@ public class PlanTranslatorContext {
         return bufferedTupleForWindow;
     }
 
-    public void setBufferedTupleForWindow(TupleDescriptor bufferedTupleForWindow) {
-        this.bufferedTupleForWindow = bufferedTupleForWindow;
-    }
-
     /**
      * Create SlotDesc and add it to the mappings from expression to the stales expr.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b282f94e5b..b96e7bbf3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -68,6 +68,7 @@ public class PlanPostProcessors {
         }
         builder.add(new Validator());
         builder.add(new TopNScanOpt());
+        builder.add(new TwoPhaseReadOpt());
         return builder.build();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index 7a2d93f7c1..bb00983d2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -19,7 +19,6 @@ package org.apache.doris.nereids.processor.post;
 
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
 import org.apache.doris.nereids.trees.plans.algebra.Filter;
@@ -36,6 +35,7 @@ import org.apache.doris.qe.ConnectContext;
  */
 
 public class TopNScanOpt extends PlanPostProcessor {
+
     @Override
     public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
         topN.child().accept(this, ctx);
@@ -43,22 +43,23 @@ public class TopNScanOpt extends PlanPostProcessor {
         if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
             return topN;
         }
-        long threshold = getTopNOptLimitThreshold();
-        if (threshold == -1 || topN.getLimit() > threshold) {
-            return topN;
-        }
         if (topN.getOrderKeys().isEmpty()) {
             return topN;
         }
-        Expression firstKey = topN.getOrderKeys().get(0).getExpr();
 
+        // topn opt
+        long topNOptLimitThreshold = getTopNOptLimitThreshold();
+        if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
+            return topN;
+        }
         // if firstKey's column is not present, it means the firstKey is not a original column from scan node
         // for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
         // a cast expr which is not from tbl1 and its column is not present.
         // On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1
         // so its column is present which is valid for topN optimize
         // see Alias::toSlot() method to get how column info is passed around by alias of slotReference
-        if (!(firstKey instanceof SlotReference) || !((SlotReference) firstKey).getColumn().isPresent()) {
+        Expression firstKey = topN.getOrderKeys().get(0).getExpr();
+        if (!firstKey.isColumnFromTable()) {
             return topN;
         }
         if (firstKey.getDataType().isStringLikeType()
@@ -66,15 +67,20 @@ public class TopNScanOpt extends PlanPostProcessor {
                 || firstKey.getDataType().isDoubleType()) {
             return topN;
         }
-        while (child != null && (child instanceof Project || child instanceof Filter)) {
+
+        PhysicalOlapScan olapScan;
+        while (child instanceof Project || child instanceof Filter) {
             child = child.child(0);
         }
-        if (child instanceof PhysicalOlapScan) {
-            PhysicalOlapScan scan = (PhysicalOlapScan) child;
-            if (scan.getTable().isDupKeysOrMergeOnWrite()) {
-                topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
-            }
+        if (!(child instanceof PhysicalOlapScan)) {
+            return topN;
+        }
+        olapScan = (PhysicalOlapScan) child;
+
+        if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
+            topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
         }
+
         return topN;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
new file mode 100644
index 0000000000..454caae435
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.SortPhase;
+import org.apache.doris.nereids.trees.plans.algebra.Filter;
+import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * two phase read opt
+ * refer to:
+ * https://github.com/apache/doris/pull/15642
+ * https://github.com/apache/doris/pull/16460
+ * https://github.com/apache/doris/pull/16848
+ */
+
+public class TwoPhaseReadOpt extends PlanPostProcessor {
+
+    @Override
+    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
+        topN.child().accept(this, ctx);
+        Plan child = topN.child();
+        if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
+            return topN;
+        }
+        if (topN.getOrderKeys().isEmpty()) {
+            return topN;
+        }
+
+        // topn opt
+        long topNOptLimitThreshold = getTopNOptLimitThreshold();
+        if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) {
+            return topN;
+        }
+        if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
+            return topN;
+        }
+
+        PhysicalOlapScan olapScan;
+        PhysicalProject<Plan> project = null;
+        PhysicalFilter<Plan> filter = null;
+        while (child instanceof Project || child instanceof Filter) {
+            if (child instanceof Filter) {
+                filter = (PhysicalFilter<Plan>) child;
+            }
+            if (child instanceof Project) {
+                project = (PhysicalProject<Plan>) child;
+                // TODO: remove this after fix two phase read on project core
+                return topN;
+            }
+            child = child.child(0);
+        }
+        if (!(child instanceof PhysicalOlapScan)) {
+            return topN;
+        }
+        olapScan = (PhysicalOlapScan) child;
+
+        // all order key must column from table
+        if (!olapScan.getTable().getEnableLightSchemaChange()) {
+            return topN;
+        }
+
+        Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
+        if (project != null) {
+            for (Expression e : project.getProjects()) {
+                if (e.isSlot()) {
+                    Slot slot = (Slot) e;
+                    projectRevertedMap.put(slot.getExprId(), slot.getExprId());
+                } else if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.child().isSlot()) {
+                        Slot slot = (Slot) alias.child();
+                        projectRevertedMap.put(alias.getExprId(), slot.getExprId());
+                    }
+                }
+            }
+        }
+        Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(olapScan.getOutputExprIdSet());
+        if (filter != null) {
+            filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
+        }
+        topN.getOrderKeys().stream()
+                .map(OrderKey::getExpr)
+                .map(Slot.class::cast)
+                .map(NamedExpression::getExprId)
+                .map(projectRevertedMap::get)
+                .filter(Objects::nonNull)
+                .forEach(deferredMaterializedExprIds::remove);
+        topN.getOrderKeys().stream()
+                .map(OrderKey::getExpr)
+                .map(Slot.class::cast)
+                .map(NamedExpression::getExprId)
+                .forEach(deferredMaterializedExprIds::remove);
+        topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+        olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
+
+        return topN;
+    }
+
+    private long getTopNOptLimitThreshold() {
+        if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
+            if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
+                return -1;
+            }
+            return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
+        }
+        return -1;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
index a217d44c29..8a4d509507 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
@@ -178,6 +178,10 @@ public abstract class Expression extends AbstractTreeNode<Expression> implements
         return this instanceof Slot;
     }
 
+    public boolean isColumnFromTable() {
+        return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index 2985ab8df4..fac75df009 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -41,6 +41,8 @@ import java.util.Optional;
  */
 public class PhysicalOlapScan extends PhysicalRelation implements OlapScan {
 
+    public static final String DEFERRED_MATERIALIZED_SLOTS = "deferred_materialized_slots";
+
     private final OlapTable olapTable;
     private final DistributionSpec distributionSpec;
     private final long selectedIndexId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index e22b75078c..2ca9a4c51c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -39,7 +39,10 @@ import java.util.Optional;
  * Physical top-N plan.
  */
 public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
-    public static String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
+
+    public static final String TOPN_OPT = "topn_opt";
+    public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt";
+
     private final long limit;
     private final long offset;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index af09a4f5d4..e4dd754be9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -240,7 +240,7 @@ public class RuntimeFilterTest extends SSBTestBase {
         PhysicalPlan plan = checker.getPhysicalPlan();
         new PlanPostProcessors(checker.getCascadesContext()).process(plan);
         System.out.println(plan.treeString());
-        new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(checker.getCascadesContext()));
+        new PhysicalPlanTranslator(new PlanTranslatorContext(checker.getCascadesContext())).translatePlan(plan);
         RuntimeFilterContext context = checker.getCascadesContext().getRuntimeFilterContext();
         List<RuntimeFilter> filters = context.getNereidsRuntimeFilter();
         Assertions.assertEquals(filters.size(), context.getLegacyFilters().size() + context.getTargetNullCount());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
index 6cbbcbf071..9da544143a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
@@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
         new PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
         PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
-        Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+        Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
     }
 
     // topn rf do not apply on string-like and float column
@@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
         new PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
         PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
-        Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+        Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java
index 844a847b09..e73713bc7d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java
@@ -96,7 +96,7 @@ public class AnalyzeSubQueryTest extends TestWithFeService implements MemoPatter
                     PhysicalProperties.ANY
             );
             // Just to check whether translate will throw exception
-            new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(planner.getCascadesContext()));
+            new PhysicalPlanTranslator(new PlanTranslatorContext(planner.getCascadesContext())).translatePlan(plan);
         }
     }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java
index 488036f461..677fb816d7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java
@@ -141,7 +141,7 @@ public class AnalyzeWhereSubqueryTest extends TestWithFeService implements MemoP
                         PhysicalProperties.ANY
                 );
                 // Just to check whether translate will throw exception
-                new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext());
+                new PhysicalPlanTranslator(new PlanTranslatorContext()).translatePlan(plan);
             } catch (Throwable t) {
                 throw new IllegalStateException("Test sql failed: " + t.getMessage() + ", sql:\n" + sql, t);
             }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java
index 78760b1f7e..9b1a954a2a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java
@@ -133,7 +133,7 @@ public class RegisterCTETest extends TestWithFeService implements MemoPatternMat
                     PhysicalProperties.ANY
             );
             // Just to check whether translate will throw exception
-            new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext());
+            new PhysicalPlanTranslator(new PlanTranslatorContext()).translatePlan(plan);
         }
     }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java
index 8e9f69fcd8..524a540311 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java
@@ -105,7 +105,7 @@ public class ViewTest extends TestWithFeService implements MemoPatternMatchSuppo
                     PhysicalProperties.ANY
             );
             // Just to check whether translate will throw exception
-            new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(planner.getCascadesContext()));
+            new PhysicalPlanTranslator(new PlanTranslatorContext(planner.getCascadesContext())).translatePlan(plan);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org