You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/23 16:20:40 UTC

[doris] 07/08: [fix](Nereids) two phase read for topn only support simple case (#18955)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git

commit eb329be48606600241a6903be9349f818496901f
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Sun Apr 23 21:32:23 2023 +0800

    [fix](Nereids) two phase read for topn only support simple case (#18955)
    
    1. topn must has merge node
    2. topn must the top node of plan
---
 .../glue/translator/PhysicalPlanTranslator.java    |  6 +--
 .../nereids/processor/post/PlanPostProcessor.java  |  5 ++
 .../nereids/processor/post/PlanPostProcessors.java |  2 +-
 .../doris/nereids/processor/post/TopNScanOpt.java  |  2 +-
 .../nereids/processor/post/TwoPhaseReadOpt.java    | 55 +++++++++++++++-------
 .../nereids/trees/plans/physical/PhysicalTopN.java |  2 +-
 .../nereids/postprocess/TopNRuntimeFilterTest.java |  4 +-
 7 files changed, 49 insertions(+), 27 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 4b80f64036..aa0dc972c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -884,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
             SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
             sortNode.setOffset(topN.getOffset());
             sortNode.setLimit(topN.getLimit());
-            if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
+            if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
                 sortNode.setUseTopnOpt(true);
                 PlanNode child = sortNode.getChild(0);
                 Preconditions.checkArgument(child instanceof OlapScanNode,
@@ -928,9 +928,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         });
         List<Expr> sortTupleOutputList = new ArrayList<>();
         List<Slot> outputList = sort.getOutput();
-        outputList.forEach(k -> {
-            sortTupleOutputList.add(ExpressionTranslator.translate(k, context));
-        });
+        outputList.forEach(k -> sortTupleOutputList.add(ExpressionTranslator.translate(k, context)));
         // 2. Generate new Tuple and get current slotRef for newOrderingExprList
         List<Expr> newOrderingExprList = Lists.newArrayList();
         TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
index fa6a9deaa9..5090acedf4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
@@ -18,10 +18,15 @@
 package org.apache.doris.nereids.processor.post;
 
 import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 
 /**
  * PlanPostprocessor: a PlanVisitor to rewrite PhysicalPlan to new PhysicalPlan.
  */
 public class PlanPostProcessor extends DefaultPlanRewriter<CascadesContext> {
+
+    public Plan processRoot(Plan plan, CascadesContext ctx) {
+        return plan.accept(this, ctx);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b96e7bbf3e..0843d1e04b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -47,7 +47,7 @@ public class PlanPostProcessors {
     public PhysicalPlan process(PhysicalPlan physicalPlan) {
         PhysicalPlan resultPlan = physicalPlan;
         for (PlanPostProcessor processor : getProcessors()) {
-            resultPlan = (PhysicalPlan) resultPlan.accept(processor, cascadesContext);
+            resultPlan = (PhysicalPlan) processor.processRoot(resultPlan, cascadesContext);
         }
         return resultPlan;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index bb00983d2d..a938a231ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -78,7 +78,7 @@ public class TopNScanOpt extends PlanPostProcessor {
         olapScan = (PhysicalOlapScan) child;
 
         if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
-            topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
+            topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
         }
 
         return topN;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
index 454caae435..543f908456 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
@@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
 import org.apache.doris.nereids.trees.plans.algebra.Filter;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
@@ -52,23 +53,40 @@ import java.util.Set;
 public class TwoPhaseReadOpt extends PlanPostProcessor {
 
     @Override
-    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
-        topN.child().accept(this, ctx);
-        Plan child = topN.child();
-        if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
-            return topN;
+    public Plan processRoot(Plan plan, CascadesContext ctx) {
+        if (plan instanceof PhysicalTopN) {
+            PhysicalTopN<Plan> physicalTopN = (PhysicalTopN<Plan>) plan;
+            if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) {
+                return plan.accept(this, ctx);
+            }
         }
-        if (topN.getOrderKeys().isEmpty()) {
-            return topN;
+        return plan;
+    }
+
+    @Override
+    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> mergeTopN, CascadesContext ctx) {
+        mergeTopN.child().accept(this, ctx);
+        Plan child = mergeTopN.child();
+        if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || !(mergeTopN.child() instanceof PhysicalDistribute)) {
+            return mergeTopN;
+        }
+        PhysicalDistribute<Plan> distribute = (PhysicalDistribute<Plan>) mergeTopN.child();
+        if (!(distribute.child() instanceof PhysicalTopN)) {
+            return mergeTopN;
+        }
+        PhysicalTopN<Plan> localTopN = (PhysicalTopN<Plan>) distribute.child();
+
+        if (localTopN.getOrderKeys().isEmpty()) {
+            return mergeTopN;
         }
 
         // topn opt
         long topNOptLimitThreshold = getTopNOptLimitThreshold();
-        if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) {
-            return topN;
+        if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > topNOptLimitThreshold) {
+            return mergeTopN;
         }
-        if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
-            return topN;
+        if (!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
+            return mergeTopN;
         }
 
         PhysicalOlapScan olapScan;
@@ -81,18 +99,18 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
             if (child instanceof Project) {
                 project = (PhysicalProject<Plan>) child;
                 // TODO: remove this after fix two phase read on project core
-                return topN;
+                return mergeTopN;
             }
             child = child.child(0);
         }
         if (!(child instanceof PhysicalOlapScan)) {
-            return topN;
+            return mergeTopN;
         }
         olapScan = (PhysicalOlapScan) child;
 
         // all order key must column from table
         if (!olapScan.getTable().getEnableLightSchemaChange()) {
-            return topN;
+            return mergeTopN;
         }
 
         Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
@@ -114,22 +132,23 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
         if (filter != null) {
             filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
         }
-        topN.getOrderKeys().stream()
+        localTopN.getOrderKeys().stream()
                 .map(OrderKey::getExpr)
                 .map(Slot.class::cast)
                 .map(NamedExpression::getExprId)
                 .map(projectRevertedMap::get)
                 .filter(Objects::nonNull)
                 .forEach(deferredMaterializedExprIds::remove);
-        topN.getOrderKeys().stream()
+        localTopN.getOrderKeys().stream()
                 .map(OrderKey::getExpr)
                 .map(Slot.class::cast)
                 .map(NamedExpression::getExprId)
                 .forEach(deferredMaterializedExprIds::remove);
-        topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+        localTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+        mergeTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
         olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
 
-        return topN;
+        return mergeTopN;
     }
 
     private long getTopNOptLimitThreshold() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 2ca9a4c51c..4a58f5d9e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -40,7 +40,7 @@ import java.util.Optional;
  */
 public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
 
-    public static final String TOPN_OPT = "topn_opt";
+    public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
     public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt";
 
     private final long limit;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
index 9da544143a..6cbbcbf071 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
@@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
         new PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
         PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
-        Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
+        Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
     }
 
     // topn rf do not apply on string-like and float column
@@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
         new PlanPostProcessors(checker.getCascadesContext()).process(plan);
         Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
         PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
-        Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
+        Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org