You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/23 16:20:40 UTC
[doris] 07/08: [fix](Nereids) two phase read for topn only support simple case (#18955)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git
commit eb329be48606600241a6903be9349f818496901f
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Sun Apr 23 21:32:23 2023 +0800
[fix](Nereids) two phase read for topn only support simple case (#18955)
1. topn must has merge node
2. topn must the top node of plan
---
.../glue/translator/PhysicalPlanTranslator.java | 6 +--
.../nereids/processor/post/PlanPostProcessor.java | 5 ++
.../nereids/processor/post/PlanPostProcessors.java | 2 +-
.../doris/nereids/processor/post/TopNScanOpt.java | 2 +-
.../nereids/processor/post/TwoPhaseReadOpt.java | 55 +++++++++++++++-------
.../nereids/trees/plans/physical/PhysicalTopN.java | 2 +-
.../nereids/postprocess/TopNRuntimeFilterTest.java | 4 +-
7 files changed, 49 insertions(+), 27 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 4b80f64036..aa0dc972c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -884,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
- if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
+ if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,
@@ -928,9 +928,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
});
List<Expr> sortTupleOutputList = new ArrayList<>();
List<Slot> outputList = sort.getOutput();
- outputList.forEach(k -> {
- sortTupleOutputList.add(ExpressionTranslator.translate(k, context));
- });
+ outputList.forEach(k -> sortTupleOutputList.add(ExpressionTranslator.translate(k, context)));
// 2. Generate new Tuple and get current slotRef for newOrderingExprList
List<Expr> newOrderingExprList = Lists.newArrayList();
TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
index fa6a9deaa9..5090acedf4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java
@@ -18,10 +18,15 @@
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
/**
* PlanPostprocessor: a PlanVisitor to rewrite PhysicalPlan to new PhysicalPlan.
*/
public class PlanPostProcessor extends DefaultPlanRewriter<CascadesContext> {
+
+ public Plan processRoot(Plan plan, CascadesContext ctx) {
+ return plan.accept(this, ctx);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b96e7bbf3e..0843d1e04b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -47,7 +47,7 @@ public class PlanPostProcessors {
public PhysicalPlan process(PhysicalPlan physicalPlan) {
PhysicalPlan resultPlan = physicalPlan;
for (PlanPostProcessor processor : getProcessors()) {
- resultPlan = (PhysicalPlan) resultPlan.accept(processor, cascadesContext);
+ resultPlan = (PhysicalPlan) processor.processRoot(resultPlan, cascadesContext);
}
return resultPlan;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index bb00983d2d..a938a231ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -78,7 +78,7 @@ public class TopNScanOpt extends PlanPostProcessor {
olapScan = (PhysicalOlapScan) child;
if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
- topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
+ topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
}
return topN;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
index 454caae435..543f908456 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
@@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
@@ -52,23 +53,40 @@ import java.util.Set;
public class TwoPhaseReadOpt extends PlanPostProcessor {
@Override
- public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
- topN.child().accept(this, ctx);
- Plan child = topN.child();
- if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
- return topN;
+ public Plan processRoot(Plan plan, CascadesContext ctx) {
+ if (plan instanceof PhysicalTopN) {
+ PhysicalTopN<Plan> physicalTopN = (PhysicalTopN<Plan>) plan;
+ if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) {
+ return plan.accept(this, ctx);
+ }
}
- if (topN.getOrderKeys().isEmpty()) {
- return topN;
+ return plan;
+ }
+
+ @Override
+ public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> mergeTopN, CascadesContext ctx) {
+ mergeTopN.child().accept(this, ctx);
+ Plan child = mergeTopN.child();
+ if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || !(mergeTopN.child() instanceof PhysicalDistribute)) {
+ return mergeTopN;
+ }
+ PhysicalDistribute<Plan> distribute = (PhysicalDistribute<Plan>) mergeTopN.child();
+ if (!(distribute.child() instanceof PhysicalTopN)) {
+ return mergeTopN;
+ }
+ PhysicalTopN<Plan> localTopN = (PhysicalTopN<Plan>) distribute.child();
+
+ if (localTopN.getOrderKeys().isEmpty()) {
+ return mergeTopN;
}
// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
- if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) {
- return topN;
+ if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > topNOptLimitThreshold) {
+ return mergeTopN;
}
- if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
- return topN;
+ if (!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
+ return mergeTopN;
}
PhysicalOlapScan olapScan;
@@ -81,18 +99,18 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
if (child instanceof Project) {
project = (PhysicalProject<Plan>) child;
// TODO: remove this after fix two phase read on project core
- return topN;
+ return mergeTopN;
}
child = child.child(0);
}
if (!(child instanceof PhysicalOlapScan)) {
- return topN;
+ return mergeTopN;
}
olapScan = (PhysicalOlapScan) child;
// all order key must column from table
if (!olapScan.getTable().getEnableLightSchemaChange()) {
- return topN;
+ return mergeTopN;
}
Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
@@ -114,22 +132,23 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
if (filter != null) {
filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
}
- topN.getOrderKeys().stream()
+ localTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.map(projectRevertedMap::get)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
- topN.getOrderKeys().stream()
+ localTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.forEach(deferredMaterializedExprIds::remove);
- topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+ localTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+ mergeTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
- return topN;
+ return mergeTopN;
}
private long getTopNOptLimitThreshold() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 2ca9a4c51c..4a58f5d9e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -40,7 +40,7 @@ import java.util.Optional;
*/
public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
- public static final String TOPN_OPT = "topn_opt";
+ public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt";
private final long limit;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
index 9da544143a..6cbbcbf071 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
@@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
- Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
+ Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
}
// topn rf do not apply on string-like and float column
@@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
- Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
+ Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org