You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/21 04:05:29 UTC
[doris] branch master updated: [enhancement](Nereids) two phase read for topn (#18829)
This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b84bd156fb [enhancement](Nereids) two phase read for topn (#18829)
b84bd156fb is described below
commit b84bd156fb749d454cbfc94ee204dfbdca87b0f0
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Fri Apr 21 12:05:22 2023 +0800
[enhancement](Nereids) two phase read for topn (#18829)
add two phase read topn opt, the legacy planner's PR are:
- #15642
- #16460
- #16848
TODO:
we forbid limit(sort(project(scan))) since be core when plan has a project on the scan.
we need to remove this restirction after we fix be bug
---
.../org/apache/doris/analysis/SlotDescriptor.java | 2 +-
.../glue/translator/PhysicalPlanTranslator.java | 130 ++++++++++++++++---
.../glue/translator/PlanTranslatorContext.java | 4 -
.../nereids/processor/post/PlanPostProcessors.java | 1 +
.../doris/nereids/processor/post/TopNScanOpt.java | 32 +++--
.../nereids/processor/post/TwoPhaseReadOpt.java | 144 +++++++++++++++++++++
.../nereids/trees/expressions/Expression.java | 4 +
.../trees/plans/physical/PhysicalOlapScan.java | 2 +
.../nereids/trees/plans/physical/PhysicalTopN.java | 5 +-
.../nereids/postprocess/RuntimeFilterTest.java | 2 +-
.../nereids/postprocess/TopNRuntimeFilterTest.java | 4 +-
.../rules/analysis/AnalyzeSubQueryTest.java | 2 +-
.../rules/analysis/AnalyzeWhereSubqueryTest.java | 2 +-
.../nereids/rules/analysis/RegisterCTETest.java | 2 +-
.../doris/nereids/trees/expressions/ViewTest.java | 2 +-
15 files changed, 292 insertions(+), 46 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index cc19f94fa7..bc4c4bc37f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -68,7 +68,7 @@ public class SlotDescriptor {
private boolean isAgg;
private boolean isMultiRef;
// If set to false, then such slots will be ignored during
- // materialize them.Used to optmize to read less data and less memory usage
+ // materialize them.Used to optimize to read less data and less memory usage
private boolean needMaterialize = true;
public SlotDescriptor(SlotId id, TupleDescriptor parent) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6a22f11c72..4b80f64036 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -176,21 +176,23 @@ import java.util.stream.Stream;
* </STRONG>
*/
public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, PlanTranslatorContext> {
+
private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class);
- protected StatsErrorEstimator statsErrorEstimator;
- PlanTranslatorContext context;
+
+ private final StatsErrorEstimator statsErrorEstimator;
+ private final PlanTranslatorContext context;
public PhysicalPlanTranslator() {
+ this(null, null);
}
- public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
- this.context = context;
- this.statsErrorEstimator = statsErrorEstimator;
+ public PhysicalPlanTranslator(PlanTranslatorContext context) {
+ this(context, null);
}
- public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) {
+ public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
this.context = context;
- return translatePlan(physicalPlan);
+ this.statsErrorEstimator = statsErrorEstimator;
}
/**
@@ -497,8 +499,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
.addAll(olapScan.getOutput())
.addAll(filterSlotsOfSelectedIndex(olapScan.getNonUserVisibleOutput(), olapScan))
.build();
+ Set<ExprId> deferredMaterializedExprIds = Collections.emptySet();
+ if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
+ deferredMaterializedExprIds = (Set<ExprId>) (olapScan
+ .getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).get());
+ }
OlapTable olapTable = olapScan.getTable();
- TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, context);
+ TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, deferredMaterializedExprIds, context);
+ if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
+ injectRowIdColumnSlot(tupleDescriptor);
+ }
// Use column with the same name in selected materialized index meta for slot desc,
// to get the correct col unique id.
@@ -513,7 +523,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
});
}
-
tupleDescriptor.setTable(olapTable);
OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode");
@@ -875,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
- if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
+ if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,
@@ -929,6 +938,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, true);
sortNode.finalizeForNereids(tupleDesc, sortTupleOutputList, oldOrderingExprList);
+ if (sort.getMutableState(PhysicalTopN.TWO_PHASE_READ_OPT).isPresent()) {
+ sortNode.setUseTwoPhaseReadOpt(true);
+ sortNode.getSortInfo().setUseTwoPhaseRead();
+ injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor());
+ TupleDescriptor childTuple = childNode.getOutputTupleDesc() != null
+ ? childNode.getOutputTupleDesc() : context.getTupleDesc(childNode.getTupleIds().get(0));
+ SlotDescriptor childRowIdDesc = childTuple.getSlots().get(childTuple.getSlots().size() - 1);
+ sortNode.getResolvedTupleExprs().add(new SlotRef(childRowIdDesc));
+ }
if (sort.getStats() != null) {
sortNode.setCardinality((long) sort.getStats().getRowCount());
}
@@ -1093,7 +1111,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
continue;
}
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
- SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+ SlotDescriptor sd;
+ if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+ // TODO: temporary code for two phase read, should remove it after refactor
+ sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
+ } else {
+ sd = context.createSlotDesc(intermediateDescriptor, sf);
+ }
leftIntermediateSlotDescriptor.add(sd);
}
} else if (hashJoin.getOtherJoinConjuncts().isEmpty()
@@ -1103,7 +1127,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
continue;
}
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
- SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+ SlotDescriptor sd;
+ if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+ // TODO: temporary code for two phase read, should remove it after refactor
+ sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
+ } else {
+ sd = context.createSlotDesc(intermediateDescriptor, sf);
+ }
rightIntermediateSlotDescriptor.add(sd);
}
} else {
@@ -1112,9 +1142,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
continue;
}
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
- SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
- if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
- hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
+ SlotDescriptor sd;
+ if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+ // TODO: temporary code for two phase read, should remove it after refactor
+ sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
+ } else {
+ sd = context.createSlotDesc(intermediateDescriptor, sf);
+ if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
+ hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
+ }
}
leftIntermediateSlotDescriptor.add(sd);
}
@@ -1123,9 +1159,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
continue;
}
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
- SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
- if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
- hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
+ SlotDescriptor sd;
+ if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+ // TODO: temporary code for two phase read, should remove it after refactor
+ sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
+ } else {
+ sd = context.createSlotDesc(intermediateDescriptor, sf);
+ if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
+ hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
+ }
}
rightIntermediateSlotDescriptor.add(sd);
}
@@ -1280,7 +1322,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
continue;
}
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
- SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+ SlotDescriptor sd;
+ if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+ // TODO: temporary code for two phase read, should remove it after refactor
+ sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
+ } else {
+ sd = context.createSlotDesc(intermediateDescriptor, sf);
+ }
leftIntermediateSlotDescriptor.add(sd);
}
for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
@@ -1288,7 +1336,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
continue;
}
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
- SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
+ SlotDescriptor sd;
+ if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
+ // TODO: temporary code for two phase read, should remove it after refactor
+ sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
+ } else {
+ sd = context.createSlotDesc(intermediateDescriptor, sf);
+ }
rightIntermediateSlotDescriptor.add(sd);
}
@@ -1405,6 +1459,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
inputPlanNode.setProjectList(execExprList);
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
+ // TODO: this is a temporary scheme to support two phase read when has project.
+ // we need to refactor all topn opt into rbo stage.
+ if (inputPlanNode instanceof OlapScanNode) {
+ ArrayList<SlotDescriptor> slots = context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots();
+ SlotDescriptor lastSlot = slots.get(slots.size() - 1);
+ if (lastSlot.getColumn() != null && lastSlot.getColumn().getName().equals(Column.ROWID_COL)) {
+ inputPlanNode.getProjectList().add(new SlotRef(lastSlot));
+ injectRowIdColumnSlot(tupleDescriptor);
+ requiredSlotIdSet.add(lastSlot.getId());
+ }
+ }
if (inputPlanNode instanceof ScanNode) {
updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdSet, requiredByProjectSlotIdSet, context);
@@ -1739,6 +1804,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
}
+ private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table,
+ Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext context) {
+ TupleDescriptor tupleDescriptor = context.generateTupleDesc();
+ tupleDescriptor.setTable(table);
+ for (Slot slot : slotList) {
+ SlotDescriptor slotDescriptor = context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
+ if (deferredMaterializedExprIds.contains(slot.getExprId())) {
+ slotDescriptor.setNeedMaterialize(false);
+ }
+ }
+ return tupleDescriptor;
+ }
+
private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
@@ -2153,4 +2231,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
}
}
+
+ private SlotDescriptor injectRowIdColumnSlot(TupleDescriptor tupleDesc) {
+ SlotDescriptor slotDesc = context.addSlotDesc(tupleDesc);
+ LOG.debug("inject slot {}", slotDesc);
+ String name = Column.ROWID_COL;
+ Column col = new Column(name, Type.STRING, false, null, false, "", "rowid column");
+ slotDesc.setType(Type.STRING);
+ slotDesc.setColumn(col);
+ slotDesc.setIsNullable(false);
+ slotDesc.setIsMaterialized(true);
+ return slotDesc;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index b7b7792775..7edb4b6349 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -159,10 +159,6 @@ public class PlanTranslatorContext {
return bufferedTupleForWindow;
}
- public void setBufferedTupleForWindow(TupleDescriptor bufferedTupleForWindow) {
- this.bufferedTupleForWindow = bufferedTupleForWindow;
- }
-
/**
* Create SlotDesc and add it to the mappings from expression to the stales expr.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b282f94e5b..b96e7bbf3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -68,6 +68,7 @@ public class PlanPostProcessors {
}
builder.add(new Validator());
builder.add(new TopNScanOpt());
+ builder.add(new TwoPhaseReadOpt());
return builder.build();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index 7a2d93f7c1..bb00983d2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -19,7 +19,6 @@ package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
@@ -36,6 +35,7 @@ import org.apache.doris.qe.ConnectContext;
*/
public class TopNScanOpt extends PlanPostProcessor {
+
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
@@ -43,22 +43,23 @@ public class TopNScanOpt extends PlanPostProcessor {
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
}
- long threshold = getTopNOptLimitThreshold();
- if (threshold == -1 || topN.getLimit() > threshold) {
- return topN;
- }
if (topN.getOrderKeys().isEmpty()) {
return topN;
}
- Expression firstKey = topN.getOrderKeys().get(0).getExpr();
+ // topn opt
+ long topNOptLimitThreshold = getTopNOptLimitThreshold();
+ if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
+ return topN;
+ }
// if firstKey's column is not present, it means the firstKey is not a original column from scan node
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
// a cast expr which is not from tbl1 and its column is not present.
// On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1
// so its column is present which is valid for topN optimize
// see Alias::toSlot() method to get how column info is passed around by alias of slotReference
- if (!(firstKey instanceof SlotReference) || !((SlotReference) firstKey).getColumn().isPresent()) {
+ Expression firstKey = topN.getOrderKeys().get(0).getExpr();
+ if (!firstKey.isColumnFromTable()) {
return topN;
}
if (firstKey.getDataType().isStringLikeType()
@@ -66,15 +67,20 @@ public class TopNScanOpt extends PlanPostProcessor {
|| firstKey.getDataType().isDoubleType()) {
return topN;
}
- while (child != null && (child instanceof Project || child instanceof Filter)) {
+
+ PhysicalOlapScan olapScan;
+ while (child instanceof Project || child instanceof Filter) {
child = child.child(0);
}
- if (child instanceof PhysicalOlapScan) {
- PhysicalOlapScan scan = (PhysicalOlapScan) child;
- if (scan.getTable().isDupKeysOrMergeOnWrite()) {
- topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
- }
+ if (!(child instanceof PhysicalOlapScan)) {
+ return topN;
+ }
+ olapScan = (PhysicalOlapScan) child;
+
+ if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
+ topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
}
+
return topN;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
new file mode 100644
index 0000000000..454caae435
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.SortPhase;
+import org.apache.doris.nereids.trees.plans.algebra.Filter;
+import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * two phase read opt
+ * refer to:
+ * https://github.com/apache/doris/pull/15642
+ * https://github.com/apache/doris/pull/16460
+ * https://github.com/apache/doris/pull/16848
+ */
+
+public class TwoPhaseReadOpt extends PlanPostProcessor {
+
+ @Override
+ public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
+ topN.child().accept(this, ctx);
+ Plan child = topN.child();
+ if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
+ return topN;
+ }
+ if (topN.getOrderKeys().isEmpty()) {
+ return topN;
+ }
+
+ // topn opt
+ long topNOptLimitThreshold = getTopNOptLimitThreshold();
+ if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) {
+ return topN;
+ }
+ if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
+ return topN;
+ }
+
+ PhysicalOlapScan olapScan;
+ PhysicalProject<Plan> project = null;
+ PhysicalFilter<Plan> filter = null;
+ while (child instanceof Project || child instanceof Filter) {
+ if (child instanceof Filter) {
+ filter = (PhysicalFilter<Plan>) child;
+ }
+ if (child instanceof Project) {
+ project = (PhysicalProject<Plan>) child;
+ // TODO: remove this after fix two phase read on project core
+ return topN;
+ }
+ child = child.child(0);
+ }
+ if (!(child instanceof PhysicalOlapScan)) {
+ return topN;
+ }
+ olapScan = (PhysicalOlapScan) child;
+
+ // all order key must column from table
+ if (!olapScan.getTable().getEnableLightSchemaChange()) {
+ return topN;
+ }
+
+ Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
+ if (project != null) {
+ for (Expression e : project.getProjects()) {
+ if (e.isSlot()) {
+ Slot slot = (Slot) e;
+ projectRevertedMap.put(slot.getExprId(), slot.getExprId());
+ } else if (e instanceof Alias) {
+ Alias alias = (Alias) e;
+ if (alias.child().isSlot()) {
+ Slot slot = (Slot) alias.child();
+ projectRevertedMap.put(alias.getExprId(), slot.getExprId());
+ }
+ }
+ }
+ }
+ Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(olapScan.getOutputExprIdSet());
+ if (filter != null) {
+ filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
+ }
+ topN.getOrderKeys().stream()
+ .map(OrderKey::getExpr)
+ .map(Slot.class::cast)
+ .map(NamedExpression::getExprId)
+ .map(projectRevertedMap::get)
+ .filter(Objects::nonNull)
+ .forEach(deferredMaterializedExprIds::remove);
+ topN.getOrderKeys().stream()
+ .map(OrderKey::getExpr)
+ .map(Slot.class::cast)
+ .map(NamedExpression::getExprId)
+ .forEach(deferredMaterializedExprIds::remove);
+ topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
+ olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
+
+ return topN;
+ }
+
+ private long getTopNOptLimitThreshold() {
+ if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
+ if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
+ return -1;
+ }
+ return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
+ }
+ return -1;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
index a217d44c29..8a4d509507 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
@@ -178,6 +178,10 @@ public abstract class Expression extends AbstractTreeNode<Expression> implements
return this instanceof Slot;
}
+ public boolean isColumnFromTable() {
+ return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index 2985ab8df4..fac75df009 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -41,6 +41,8 @@ import java.util.Optional;
*/
public class PhysicalOlapScan extends PhysicalRelation implements OlapScan {
+ public static final String DEFERRED_MATERIALIZED_SLOTS = "deferred_materialized_slots";
+
private final OlapTable olapTable;
private final DistributionSpec distributionSpec;
private final long selectedIndexId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index e22b75078c..2ca9a4c51c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -39,7 +39,10 @@ import java.util.Optional;
* Physical top-N plan.
*/
public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
- public static String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
+
+ public static final String TOPN_OPT = "topn_opt";
+ public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt";
+
private final long limit;
private final long offset;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index af09a4f5d4..e4dd754be9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -240,7 +240,7 @@ public class RuntimeFilterTest extends SSBTestBase {
PhysicalPlan plan = checker.getPhysicalPlan();
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
System.out.println(plan.treeString());
- new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(checker.getCascadesContext()));
+ new PhysicalPlanTranslator(new PlanTranslatorContext(checker.getCascadesContext())).translatePlan(plan);
RuntimeFilterContext context = checker.getCascadesContext().getRuntimeFilterContext();
List<RuntimeFilter> filters = context.getNereidsRuntimeFilter();
Assertions.assertEquals(filters.size(), context.getLegacyFilters().size() + context.getTargetNullCount());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
index 6cbbcbf071..9da544143a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
@@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
- Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+ Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
}
// topn rf do not apply on string-like and float column
@@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
- Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+ Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java
index 844a847b09..e73713bc7d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubQueryTest.java
@@ -96,7 +96,7 @@ public class AnalyzeSubQueryTest extends TestWithFeService implements MemoPatter
PhysicalProperties.ANY
);
// Just to check whether translate will throw exception
- new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(planner.getCascadesContext()));
+ new PhysicalPlanTranslator(new PlanTranslatorContext(planner.getCascadesContext())).translatePlan(plan);
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java
index 488036f461..677fb816d7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeWhereSubqueryTest.java
@@ -141,7 +141,7 @@ public class AnalyzeWhereSubqueryTest extends TestWithFeService implements MemoP
PhysicalProperties.ANY
);
// Just to check whether translate will throw exception
- new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext());
+ new PhysicalPlanTranslator(new PlanTranslatorContext()).translatePlan(plan);
} catch (Throwable t) {
throw new IllegalStateException("Test sql failed: " + t.getMessage() + ", sql:\n" + sql, t);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java
index 78760b1f7e..9b1a954a2a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/RegisterCTETest.java
@@ -133,7 +133,7 @@ public class RegisterCTETest extends TestWithFeService implements MemoPatternMat
PhysicalProperties.ANY
);
// Just to check whether translate will throw exception
- new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext());
+ new PhysicalPlanTranslator(new PlanTranslatorContext()).translatePlan(plan);
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java
index 8e9f69fcd8..524a540311 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/ViewTest.java
@@ -105,7 +105,7 @@ public class ViewTest extends TestWithFeService implements MemoPatternMatchSuppo
PhysicalProperties.ANY
);
// Just to check whether translate will throw exception
- new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(planner.getCascadesContext()));
+ new PhysicalPlanTranslator(new PlanTranslatorContext(planner.getCascadesContext())).translatePlan(plan);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org