You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/18 02:09:45 UTC
[incubator-doris] branch master updated: [improment](planner) push down predicate past two phase aggregate (#9498)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 94c89e8a37 [improment](planner) push down predicate past two phase aggregate (#9498)
94c89e8a37 is described below
commit 94c89e8a37a0e057e8ef743ccfe8363fb9a24b6e
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Wed May 18 10:09:39 2022 +0800
[improment](planner) push down predicate past two phase aggregate (#9498)
Push down predicate past aggregate cannot push down predicate past 2 phase aggregate.
origin plan is like this:
```
second phase agg (conjuncts on olap scan node tuples)
|
first phase agg
|
olap scan node
```
should be optimized to
```
second phase agg
|
first phase agg
|
olap scan node (conjuncts on olap scan node tuples)
```
---
.../apache/doris/planner/SingleNodePlanner.java | 302 ++++++++++++---------
.../java/org/apache/doris/planner/PlannerTest.java | 59 +++-
2 files changed, 235 insertions(+), 126 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index b02b0ad5af..8d125c1204 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -28,7 +28,6 @@ import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CaseExpr;
import org.apache.doris.analysis.CastExpr;
-import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
@@ -70,6 +69,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -90,18 +90,18 @@ import java.util.stream.Collectors;
* The single-node plan needs to be wrapped in a plan fragment for it to be executable.
*/
public class SingleNodePlanner {
- private final static Logger LOG = LogManager.getLogger(SingleNodePlanner.class);
+ private static final Logger LOG = LogManager.getLogger(SingleNodePlanner.class);
- private final PlannerContext ctx_;
+ private final PlannerContext ctx;
private final ArrayList<ScanNode> scanNodes = Lists.newArrayList();
private Map<Analyzer, List<ScanNode>> selectStmtToScanNodes = Maps.newHashMap();
public SingleNodePlanner(PlannerContext ctx) {
- ctx_ = ctx;
+ this.ctx = ctx;
}
public PlannerContext getPlannerContext() {
- return ctx_;
+ return ctx;
}
public ArrayList<ScanNode> getScanNodes() {
@@ -139,7 +139,7 @@ public class SingleNodePlanner {
* re-maps its input, set a substitution map to be applied by parents.
*/
public PlanNode createSingleNodePlan() throws UserException, AnalysisException {
- QueryStmt queryStmt = ctx_.getQueryStmt();
+ QueryStmt queryStmt = ctx.getQueryStmt();
// Use the stmt's analyzer which is not necessarily the root analyzer
// to detect empty result sets.
Analyzer analyzer = queryStmt.getAnalyzer();
@@ -164,7 +164,7 @@ public class SingleNodePlanner {
LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
}
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
- ctx_.getQueryOptions().getDefaultOrderByLimit());
+ ctx.getQueryOptions().getDefaultOrderByLimit());
Preconditions.checkNotNull(singleNodePlan);
return singleNodePlan;
}
@@ -189,7 +189,7 @@ public class SingleNodePlanner {
tupleIds.add(createResultTupleDescriptor(selectStmt, "empty", analyzer).getId());
}
unmarkCollectionSlots(stmt);
- EmptySetNode node = new EmptySetNode(ctx_.getNextNodeId(), tupleIds);
+ EmptySetNode node = new EmptySetNode(ctx.getNextNodeId(), tupleIds);
node.init(analyzer);
// Set the output smap to resolve exprs referencing inline views within stmt.
// Not needed for a UnionStmt because it materializes its input operands.
@@ -240,7 +240,7 @@ public class SingleNodePlanner {
// insert possible AnalyticEvalNode before SortNode
if (selectStmt.getAnalyticInfo() != null) {
AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
- AnalyticPlanner analyticPlanner = new AnalyticPlanner(analyticInfo, analyzer, ctx_);
+ AnalyticPlanner analyticPlanner = new AnalyticPlanner(analyticInfo, analyzer, ctx);
List<Expr> inputPartitionExprs = Lists.newArrayList();
AggregateInfo aggInfo = selectStmt.getAggInfo();
root = analyticPlanner.createSingleNodePlan(root,
@@ -275,7 +275,7 @@ public class SingleNodePlanner {
if (limit == -1 && analyzer.getContext().getSessionVariable().enableSpilling) {
useTopN = false;
}
- root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
+ root = new SortNode(ctx.getNextNodeId(), root, stmt.getSortInfo(),
useTopN, limit == -1, stmt.getOffset());
if (useTopN) {
root.setLimit(limit != -1 ? limit : newDefaultOrderByLimit);
@@ -327,7 +327,7 @@ public class SingleNodePlanner {
return root;
}
// evaluate conjuncts in SelectNode
- SelectNode selectNode = new SelectNode(ctx_.getNextNodeId(), root, conjuncts);
+ SelectNode selectNode = new SelectNode(ctx.getNextNodeId(), root, conjuncts);
selectNode.init(analyzer);
Preconditions.checkState(selectNode.hasValidStats());
return selectNode;
@@ -343,15 +343,11 @@ public class SingleNodePlanner {
// Gather unassigned conjuncts and generate predicates to enfore
// slot equivalences for each tuple id.
List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root);
- for (TupleId tid : tupleIds) {
- // TODO(zc)
- // analyzer.createEquivConjuncts(tid, conjuncts);
- }
if (conjuncts.isEmpty()) {
return root;
}
// evaluate conjuncts in SelectNode
- SelectNode selectNode = new SelectNode(ctx_.getNextNodeId(), root, conjuncts);
+ SelectNode selectNode = new SelectNode(ctx.getNextNodeId(), root, conjuncts);
// init() marks conjuncts as assigned
selectNode.init(analyzer);
Preconditions.checkState(selectNode.hasValidStats());
@@ -384,8 +380,8 @@ public class SingleNodePlanner {
final JoinOperator joinOperator = selectStmt.getTableRefs().get(i).getJoinOp();
// TODO chenhao , right out join ?
if (joinOperator.isRightOuterJoin() || joinOperator.isFullOuterJoin()) {
- turnOffReason = selectStmt.getTableRefs().get(i) +
- " joinOp is full outer join or right outer join.";
+ turnOffReason = selectStmt.getTableRefs().get(i)
+ + " joinOp is full outer join or right outer join.";
aggTableValidate = false;
break;
}
@@ -444,8 +440,8 @@ public class SingleNodePlanner {
for (SlotDescriptor slot : selectStmt.getTableRefs().get(0).getDesc().getSlots()) {
if (!slot.getColumn().isKey()) {
if (conjunctSlotIds.contains(slot.getId())) {
- turnOffReason = "conjunct on `" + slot.getColumn().getName() +
- "` which is StorageEngine value column";
+ turnOffReason = "conjunct on `" + slot.getColumn().getName()
+ + "` which is StorageEngine value column";
valueColumnValidate = false;
break;
}
@@ -473,7 +469,9 @@ public class SingleNodePlanner {
&& child.getChild(0).getType().isNumericType()) {
returnColumns.add(((SlotRef) child.getChild(0)).getDesc().getColumn());
} else {
- turnOffReason = "aggExpr.getChild(0)[" + aggExpr.getChild(0).toSql() + "] is not Numeric CastExpr";
+ turnOffReason = "aggExpr.getChild(0)["
+ + aggExpr.getChild(0).toSql()
+ + "] is not Numeric CastExpr";
aggExprValidate = false;
break;
}
@@ -486,8 +484,6 @@ public class SingleNodePlanner {
conditionExpr.getIds(conditionTupleIds, conditionSlotIds);
for (SlotId conditionSlotId : conditionSlotIds) {
- DescriptorTable descTable = analyzer.getDescTbl();
- SlotDescriptor slotDesc = descTable.getSlotDesc(conditionSlotId);
conditionColumns.add(analyzer.getDescTbl().getSlotDesc(conditionSlotId).getColumn());
}
}
@@ -607,8 +603,7 @@ public class SingleNodePlanner {
returnColumnValidate = false;
break;
}
- }
- else if (aggExpr.getFnName().getFunction().equalsIgnoreCase("multi_distinct_count")) {
+ } else if (aggExpr.getFnName().getFunction().equalsIgnoreCase("multi_distinct_count")) {
// count(distinct k1), count(distinct k2) / count(distinct k1,k2) can turn on pre aggregation
if ((!col.isKey())) {
turnOffReason = "Multi count or sum distinct with non-key column: " + col.getName();
@@ -687,7 +682,8 @@ public class SingleNodePlanner {
* required tuple ids of one or more TableRefs in subplanRefs are materialized
* Returns null if we can't create an executable plan.
*/
- private PlanNode createCheapestJoinPlan(Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans) throws UserException {
+ private PlanNode createCheapestJoinPlan(Analyzer analyzer,
+ List<Pair<TableRef, PlanNode>> refPlans) throws UserException {
if (refPlans.size() == 1) {
return refPlans.get(0).second;
}
@@ -739,7 +735,9 @@ public class SingleNodePlanner {
for (Pair<TableRef, Long> candidate : candidates) {
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
- if (result != null) return result;
+ if (result != null) {
+ return result;
+ }
}
return null;
}
@@ -790,16 +788,15 @@ public class SingleNodePlanner {
// join in the original query still remain to the left/right after join ordering.
// This prevents join re-ordering across outer/semi joins which is generally wrong.
- /**
- * Key of precedingRefs: the right table ref of outer or semi join
- * Value of precedingRefs: the preceding refs of this key
- * For example:
- * select * from t1, t2, t3 left join t4, t5, t6 right semi join t7, t8, t9
- * Map:
- * { t4: [t1, t2, t3],
- * t7: [t1, t2, t3, t4, t5, t6]
- * }
- */
+
+ // Key of precedingRefs: the right table ref of outer or semi join
+ // Value of precedingRefs: the preceding refs of this key
+ // For example:
+ // select * from t1, t2, t3 left join t4, t5, t6 right semi join t7, t8, t9
+ // Map:
+ // { t4: [t1, t2, t3],
+ // t7: [t1, t2, t3, t4, t5, t6]
+ // }
Map<TableRef, Set<TableRef>> precedingRefs = new HashMap<>();
List<TableRef> tmpTblRefs = new ArrayList<>();
for (Pair<TableRef, PlanNode> entry : refPlans) {
@@ -834,21 +831,20 @@ public class SingleNodePlanner {
if (requiredRefs != null) {
Preconditions.checkState(joinOp.isOuterJoin()
|| joinOp.isSemiJoin());
- /**
- * The semi and outer join nodes are similar to the stop nodes in each round of the algorithm.
- * If the stop node is encountered during the current round of optimal selection,
- * it means that the following nodes do not need to be referred to.
- * This round has been completed.
- * There are two situation in here.
- * Situation 1: required table refs have not been placed yet
- * t1, t2, t3 left join t4, t5
- * Round 1: t3, t1(new root) meets t4(stop)
- * stop this round and begin next round
- * Situation 2: the remaining table refs to prevent incorrect re-ordering of tables across outer/semi joins
- * Round 1: t5, t1, t2, t3(root) meets t4(stop)
- * stop this round while the new root is null
- * planning failed and return null
- */
+ // The semi and outer join nodes are similar to the stop nodes in each round of the algorithm.
+ // If the stop node is encountered during the current round of optimal selection,
+ // it means that the following nodes do not need to be referred to.
+ // This round has been completed.
+ // There are two situation in here.
+ // Situation 1: required table refs have not been placed yet
+ // t1, t2, t3 left join t4, t5
+ // Round 1: t3, t1(new root) meets t4(stop)
+ // stop this round and begin next round
+ // Situation 2: the remaining table refs to prevent incorrect re-ordering
+ // of tables across outer/semi joins
+ // Round 1: t5, t1, t2, t3(root) meets t4(stop)
+ // stop this round while the new root is null
+ // planning failed and return null
if (!requiredRefs.equals(joinedRefs)) {
break;
}
@@ -882,14 +878,16 @@ public class SingleNodePlanner {
// Always prefer Hash Join over Nested-Loop Join due to limited costing
// infrastructure.
- /**
- * The following three conditions are met while the candidate is better.
- * 1. The first candidate
- * 2. The candidate is better than new root: [t3, t2] pk [t3, t1] => [t3, t1]
- * 3. The hash join is better than cross join: [t3 cross t1] pk [t3 hash t2] => t3 hash t2
- */
+ //
+ // The following three conditions are met while the candidate is better.
+ // 1. The first candidate
+ // 2. The candidate is better than new root: [t3, t2] pk [t3, t1] => [t3, t1]
+ // 3. The hash join is better than cross join: [t3 cross t1] pk [t3 hash t2] => t3 hash t2
if (newRoot == null
- || ((candidate.getClass().equals(newRoot.getClass()) && candidateCardinalityIsSmaller(candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(), newRoot, newRootRightChildCardinality)))
+ || ((candidate.getClass().equals(newRoot.getClass())
+ && candidateCardinalityIsSmaller(
+ candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(),
+ newRoot, newRootRightChildCardinality)))
|| (candidate instanceof HashJoinNode && newRoot instanceof CrossJoinNode)) {
newRoot = candidate;
minEntry = tblRefToPlanNodeOfCandidate;
@@ -897,19 +895,18 @@ public class SingleNodePlanner {
}
}
- /**
- * The table after the outer or semi join is wrongly planned to the front,
- * causing the first tblRefToPlanNodeOfCandidate (outer or semi tbl ref) in this round of loop to fail and exit the loop.
- * This means that the current leftmost node must be wrong, and the correct result cannot be planned.
- *
- * For example:
- * Query: t1 left join t2 inner join t3
- * Input params: t3(left most tbl ref), [t1,t2] (remaining refs)
- * Round 1: t3, t1 (joined refs) t2 (remaining refs)
- * Round 2: requiredRefs.equals(joinedRefs) is false and break, the newRoot is null
- * Result: null
- * The t3 should not appear before t2 so planning is fail
- */
+ // The table after the outer or semi join is wrongly planned to the front,
+ // causing the first tblRefToPlanNodeOfCandidate (outer or semi tbl ref)
+ // in this round of loop to fail and exit the loop.
+ // This means that the current leftmost node must be wrong, and the correct result cannot be planned.
+ //
+ // For example:
+ // Query: t1 left join t2 inner join t3
+ // Input params: t3(left most tbl ref), [t1,t2] (remaining refs)
+ // Round 1: t3, t1 (joined refs) t2 (remaining refs)
+ // Round 2: requiredRefs.equals(joinedRefs) is false and break, the newRoot is null
+ // Result: null
+ // The t3 should not appear before t2 so planning is fail
if (newRoot == null) {
// Could not generate a valid plan.
// for example: the biggest table is the last table
@@ -970,7 +967,7 @@ public class SingleNodePlanner {
}
if (analyzer.hasEmptySpjResultSet() && selectStmt.getAggInfo() != null) {
- final PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples);
+ final PlanNode emptySetNode = new EmptySetNode(ctx.getNextNodeId(), rowTuples);
emptySetNode.init(analyzer);
emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap());
return createAggregationPlan(selectStmt, analyzer, emptySetNode);
@@ -1028,7 +1025,6 @@ public class SingleNodePlanner {
}
for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
- TableRef outerRef = selectStmt.getTableRefs().get(i - 1);
TableRef innerRef = selectStmt.getTableRefs().get(i);
root = createJoinNode(analyzer, root, innerRef, selectStmt);
// Have the build side of a join copy data to a compact representation
@@ -1077,7 +1073,7 @@ public class SingleNodePlanner {
GroupingInfo groupingInfo = selectStmt.getGroupingInfo();
Preconditions.checkState(groupByClause != null && groupByClause.isGroupByExtension()
&& groupingInfo != null);
- root = new RepeatNode(ctx_.getNextNodeId(), root, groupingInfo, groupByClause);
+ root = new RepeatNode(ctx.getNextNodeId(), root, groupingInfo, groupByClause);
root.init(analyzer);
return root;
}
@@ -1109,7 +1105,8 @@ public class SingleNodePlanner {
// select index by the old Rollup selector
olapScanNode.selectBestRollupByRollupSelector(analyzer);
// select index by the new Materialized selector
- MaterializedViewSelector.BestIndexInfo bestIndexInfo = materializedViewSelector.selectBestMV(olapScanNode);
+ MaterializedViewSelector.BestIndexInfo bestIndexInfo
+ = materializedViewSelector.selectBestMV(olapScanNode);
if (bestIndexInfo == null) {
selectFailed |= true;
TupleId tupleId = olapScanNode.getTupleId();
@@ -1145,7 +1142,7 @@ public class SingleNodePlanner {
// add aggregation, if required
AggregateInfo aggInfo = selectStmt.getAggInfo();
// aggInfo.substitueGroupingExpr(analyzer);
- PlanNode newRoot = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
+ PlanNode newRoot = new AggregationNode(ctx.getNextNodeId(), root, aggInfo);
newRoot.init(analyzer);
Preconditions.checkState(newRoot.hasValidStats());
// if we're computing DISTINCT agg fns, the analyzer already created the
@@ -1154,7 +1151,7 @@ public class SingleNodePlanner {
((AggregationNode) newRoot).unsetNeedsFinalize();
// The output of the 1st phase agg is the 1st phase intermediate.
((AggregationNode) newRoot).setIntermediateTuple();
- newRoot = new AggregationNode(ctx_.getNextNodeId(), newRoot,
+ newRoot = new AggregationNode(ctx.getNextNodeId(), newRoot,
aggInfo.getSecondPhaseDistinctAggInfo());
newRoot.init(analyzer);
Preconditions.checkState(newRoot.hasValidStats());
@@ -1173,7 +1170,7 @@ public class SingleNodePlanner {
ArrayList<Expr> resultExprs = selectStmt.getResultExprs();
// Create tuple descriptor for materialized tuple.
TupleDescriptor tupleDesc = createResultTupleDescriptor(selectStmt, "union", analyzer);
- UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tupleDesc.getId());
+ UnionNode unionNode = new UnionNode(ctx.getNextNodeId(), tupleDesc.getId());
// Analysis guarantees that selects without a FROM clause only have constant exprs.
if (selectStmt.getValueList() != null) {
@@ -1350,7 +1347,7 @@ public class SingleNodePlanner {
Preconditions.checkState(inlineViewRef.getMaterializedTupleIds().size() == 1);
// we need to materialize all slots of our inline view tuple
analyzer.getTupleDesc(inlineViewRef.getId()).materializeSlots();
- UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(),
+ UnionNode unionNode = new UnionNode(ctx.getNextNodeId(),
inlineViewRef.getMaterializedTupleIds().get(0));
if (analyzer.hasEmptyResultSet()) {
return unionNode;
@@ -1682,33 +1679,33 @@ public class SingleNodePlanner {
switch (tblRef.getTable().getType()) {
case OLAP:
- OlapScanNode olapNode = new OlapScanNode(ctx_.getNextNodeId(), tblRef.getDesc(),
+ OlapScanNode olapNode = new OlapScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
"OlapScanNode");
olapNode.setForceOpenPreAgg(tblRef.isForcePreAggOpened());
scanNode = olapNode;
break;
case ODBC:
- scanNode = new OdbcScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), (OdbcTable) tblRef.getTable());
+ scanNode = new OdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (OdbcTable) tblRef.getTable());
break;
case MYSQL:
- scanNode = new MysqlScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), (MysqlTable) tblRef.getTable());
+ scanNode = new MysqlScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (MysqlTable) tblRef.getTable());
break;
case SCHEMA:
- scanNode = new SchemaScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
+ scanNode = new SchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc());
break;
case BROKER:
- scanNode = new BrokerScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "BrokerScanNode",
+ scanNode = new BrokerScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "BrokerScanNode",
null, -1);
break;
case ELASTICSEARCH:
- scanNode = new EsScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "EsScanNode");
+ scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode");
break;
case HIVE:
- scanNode = new HiveScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "HiveScanNode",
+ scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HiveScanNode",
null, -1);
break;
case ICEBERG:
- scanNode = new IcebergScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
+ scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
null, -1);
break;
default:
@@ -1720,7 +1717,8 @@ public class SingleNodePlanner {
scanNodes.add(scanNode);
// now we put the selectStmtToScanNodes's init before the scanNode.init
- List<ScanNode> scanNodeList = selectStmtToScanNodes.computeIfAbsent(selectStmt.getAnalyzer(), k -> Lists.newArrayList());
+ List<ScanNode> scanNodeList = selectStmtToScanNodes.computeIfAbsent(
+ selectStmt.getAnalyzer(), k -> Lists.newArrayList());
scanNodeList.add(scanNode);
scanNode.init(analyzer);
@@ -1812,11 +1810,12 @@ public class SingleNodePlanner {
}
// construct cross join node
- // LOG.debug("Join between {} and {} requires at least one conjunctive equality predicate between the two tables",
+ // LOG.debug("Join between {} and {} requires at least one conjunctive"
+ // + " equality predicate between the two tables",
// outerRef.getAliasAsName(), innerRef.getAliasAsName());
// TODO If there are eq join predicates then we should construct a hash join
CrossJoinNode result =
- new CrossJoinNode(ctx_.getNextNodeId(), outer, inner, innerRef);
+ new CrossJoinNode(ctx.getNextNodeId(), outer, inner, innerRef);
result.init(analyzer);
return result;
}
@@ -1835,7 +1834,7 @@ public class SingleNodePlanner {
}
HashJoinNode result =
- new HashJoinNode(ctx_.getNextNodeId(), outer, inner, innerRef, eqJoinConjuncts,
+ new HashJoinNode(ctx.getNextNodeId(), outer, inner, innerRef, eqJoinConjuncts,
ojConjuncts);
result.init(analyzer);
return result;
@@ -1898,7 +1897,7 @@ public class SingleNodePlanner {
throws UserException {
Preconditions.checkNotNull(lateralViewRefs);
Preconditions.checkState(lateralViewRefs.size() > 0);
- TableFunctionNode tableFunctionNode = new TableFunctionNode(ctx_.getNextNodeId(), inputNode,
+ TableFunctionNode tableFunctionNode = new TableFunctionNode(ctx.getNextNodeId(), inputNode,
lateralViewRefs);
tableFunctionNode.init(analyzer);
tableFunctionNode.projectSlots(analyzer, selectStmt);
@@ -1930,15 +1929,15 @@ public class SingleNodePlanner {
}
switch (operation) {
case UNION:
- setOpNode = new UnionNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(),
+ setOpNode = new UnionNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
setOperationStmt.getSetOpsResultExprs(), false);
break;
case INTERSECT:
- setOpNode = new IntersectNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(),
+ setOpNode = new IntersectNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
setOperationStmt.getSetOpsResultExprs(), false);
break;
case EXCEPT:
- setOpNode = new ExceptNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(),
+ setOpNode = new ExceptNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
setOperationStmt.getSetOpsResultExprs(), false);
break;
default:
@@ -2131,7 +2130,7 @@ public class SingleNodePlanner {
if (hasDistinctOps) {
result = createSetOperationPlan(
analyzer, setOperationStmt, distinctOps, result, defaultOrderByLimit);
- result = new AggregationNode(ctx_.getNextNodeId(), result,
+ result = new AggregationNode(ctx.getNextNodeId(), result,
setOperationStmt.getDistinctAggInfo());
result.init(analyzer);
}
@@ -2145,7 +2144,7 @@ public class SingleNodePlanner {
private PlanNode createAssertRowCountNode(PlanNode input, AssertNumRowsElement assertNumRowsElement,
Analyzer analyzer) throws UserException {
- AssertNumRowsNode root = new AssertNumRowsNode(ctx_.getNextNodeId(), input, assertNumRowsElement);
+ AssertNumRowsNode root = new AssertNumRowsNode(ctx.getNextNodeId(), input, assertNumRowsElement);
root.init(analyzer);
return root;
}
@@ -2307,7 +2306,7 @@ public class SingleNodePlanner {
if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
return;
}
- putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+ putPredicatesOnTargetTupleIds(stmt.getTableRefIds(), analyzer, predicates);
}
private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
@@ -2328,28 +2327,58 @@ public class SingleNodePlanner {
if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
return;
}
- putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+ putPredicatesOnTargetTupleIds(stmt.getTableRefIds(), analyzer, predicates);
}
- private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
- final AggregateInfo aggregateInfo = stmt.getAggInfo();
- if (aggregateInfo == null || aggregateInfo.getGroupingExprs().size() <= 0) {
+ /**
+ * Push down predicates past one phase aggregation.
+ *
+ * @param aggregateInfo one phase aggregate info. Either first phase or second phase
+ * @param analyzer current statement's analyzer
+ * @param stmt current stmt
+ * @param targetTupleIds target tuple to register.
+ * Table tuple ids when process first phase agg.
+ * First aggregate's tuple id when process second phase agg.
+ * @throws AnalysisException throw exception when register predicate to tuple failed
+ */
+ private void pushDownPredicatesPastAggregationOnePhase(AggregateInfo aggregateInfo,
+ Analyzer analyzer, SelectStmt stmt, List<TupleId> targetTupleIds) throws AnalysisException {
+ if (aggregateInfo == null || aggregateInfo.getGroupingExprs().isEmpty()) {
return;
}
final List<Expr> predicates = getBoundPredicates(analyzer, aggregateInfo.getOutputTupleDesc());
- if (predicates.size() <= 0) {
+ if (predicates.isEmpty()) {
return;
}
-
// Push down predicates to aggregation's child until they are assigned successfully.
final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer, stmt);
- if (pushDownPredicates.size() <= 0) {
+ if (CollectionUtils.isEmpty(pushDownPredicates)) {
+ return;
+ }
+ putPredicatesOnTargetTupleIds(targetTupleIds, analyzer, pushDownPredicates);
+ }
+
+ /**
+ * Push down predicates past whole aggregate stage. Include first phase and second phase.
+ *
+ * @param analyzer current statement's analyzer
+ * @param stmt current stmt
+ * @throws AnalysisException throw exception when register predicate to tuple failed
+ */
+ private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
+ final AggregateInfo firstPhaseAggInfo = stmt.getAggInfo();
+ if (firstPhaseAggInfo == null) {
return;
}
- putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+ final AggregateInfo secondPhaseAggInfo = firstPhaseAggInfo.getSecondPhaseDistinctAggInfo();
+
+ final List<TupleId> firstPhaseTupleIds = Lists.newArrayList(firstPhaseAggInfo.getOutputTupleId());
+ pushDownPredicatesPastAggregationOnePhase(secondPhaseAggInfo, analyzer, stmt, firstPhaseTupleIds);
+ pushDownPredicatesPastAggregationOnePhase(firstPhaseAggInfo, analyzer, stmt, stmt.getTableRefIds());
}
- private List<Expr> getPredicatesBoundedByGroupbysSourceExpr(List<Expr> predicates, Analyzer analyzer, SelectStmt stmt) {
+ private List<Expr> getPredicatesBoundedByGroupbysSourceExpr(
+ List<Expr> predicates, Analyzer analyzer, SelectStmt stmt) {
final List<Expr> predicatesCanPushDown = Lists.newArrayList();
for (Expr predicate : predicates) {
if (predicate.isConstant()) {
@@ -2361,10 +2390,21 @@ public class SingleNodePlanner {
final List<SlotId> slotIds = Lists.newArrayList();
predicate.getIds(tupleIds, slotIds);
- boolean isAllSlotReferingGroupBys = true;
+ boolean isAllSlotReferToGroupBys = true;
for (SlotId slotId : slotIds) {
- final SlotDescriptor slotDesc = analyzer.getDescTbl().getSlotDesc(slotId);
- Expr sourceExpr = slotDesc.getSourceExprs().get(0);
+ Expr sourceExpr = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
+ // Every phase in aggregate will wrap expression with SlotRef.
+ // When we process one phase aggregate, we just need to unwrap once.
+ // But when we process 2 phase aggregate, we need to unwrap twice.
+ // So use loop here to adapt to different situations.
+ while (sourceExpr instanceof SlotRef) {
+ SlotRef slotRef = (SlotRef) sourceExpr;
+ SlotDescriptor slotDesc = slotRef.getDesc();
+ if (slotDesc.getSourceExprs().isEmpty()) {
+ break;
+ }
+ sourceExpr = slotDesc.getSourceExprs().get(0);
+ }
// if grouping set is given and column is not in all grouping set list
// we cannot push the predicate since the column value can be null
if (stmt.getGroupByClause() == null) {
@@ -2377,24 +2417,24 @@ public class SingleNodePlanner {
// if grouping type is CUBE or ROLLUP will definitely produce null
if (stmt.getGroupByClause().getGroupingType() == GroupByClause.GroupingType.CUBE
|| stmt.getGroupByClause().getGroupingType() == GroupByClause.GroupingType.ROLLUP) {
- isAllSlotReferingGroupBys = false;
+ isAllSlotReferToGroupBys = false;
} else {
// if grouping type is GROUPING_SETS and the predicate not in all grouping list,
// the predicate cannot be push down
for (List<Expr> exprs : stmt.getGroupByClause().getGroupingSetList()) {
if (!exprs.contains(sourceExpr)) {
- isAllSlotReferingGroupBys = false;
+ isAllSlotReferToGroupBys = false;
break;
}
}
}
}
if (sourceExpr.getFn() instanceof AggregateFunction) {
- isAllSlotReferingGroupBys = false;
+ isAllSlotReferToGroupBys = false;
}
}
- if (isAllSlotReferingGroupBys) {
+ if (isAllSlotReferToGroupBys) {
predicatesCanPushDown.add(predicate);
}
}
@@ -2453,16 +2493,22 @@ public class SingleNodePlanner {
return false;
}
- // Register predicates with TableRef's tuple id.
- private void putPredicatesOnFrom(SelectStmt stmt, Analyzer analyzer, List<Expr> predicates)
+ /**
+ * Register predicates on target tuple ids.
+ *
+ * @param analyzer current stmt analyzer
+ * @param predicates predicates try to register
+ * @param tupleIds target tupleIds
+ * @throws AnalysisException throw exception when register failed
+ */
+ private void putPredicatesOnTargetTupleIds(List<TupleId> tupleIds,
+ Analyzer analyzer, List<Expr> predicates)
throws AnalysisException {
- final List<TupleId> tableTupleIds = Lists.newArrayList();
- for (TableRef tableRef : stmt.getTableRefs()) {
- tableTupleIds.add(tableRef.getId());
+ if (CollectionUtils.isEmpty(tupleIds)) {
+ return;
}
-
for (Expr predicate : predicates) {
- Preconditions.checkArgument(predicate.isBoundByTupleIds(tableTupleIds),
+ Preconditions.checkArgument(predicate.isBoundByTupleIds(tupleIds),
"Predicate:" + predicate.toSql() + " can't be assigned to some PlanNode.");
final List<TupleId> predicateTupleIds = Lists.newArrayList();
predicate.getIds(predicateTupleIds, null);
@@ -2495,18 +2541,24 @@ public class SingleNodePlanner {
*/
public static BinaryPredicate getNormalizedEqPred(Expr expr, List<TupleId> lhsTids,
List<TupleId> rhsTids, Analyzer analyzer) {
- if (!(expr instanceof BinaryPredicate)) return null;
+ if (!(expr instanceof BinaryPredicate)) {
+ return null;
+ }
BinaryPredicate pred = (BinaryPredicate) expr;
if (!pred.getOp().isEquivalence()) {
return null;
}
- if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) return null;
+ if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) {
+ return null;
+ }
// Use the child that contains lhsTids as lhsExpr, for example, A join B on B.k = A.k,
// where lhsExpr=A.k, rhsExpr=B.k, changed the order, A.k = B.k
Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids);
Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids);
- if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) return null;
+ if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) {
+ return null;
+ }
BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr);
result.analyzeNoThrow(analyzer);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
index edff8da8bc..7cc148064e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
@@ -62,7 +62,15 @@ public class PlannerTest extends TestWithFeService {
"DUPLICATE KEY(k1, k2, k3) " +
"distributed by hash(k1) buckets 1 " +
"properties ('replication_num' = '1');";
- createTables(tbl1, tbl2, tbl3);
+
+ String tbl4 = "create table db1.tbl4("
+ + "k1 int,"
+ + " k2 int,"
+ + " v1 int)"
+ + " distributed by hash(k1)"
+ + " properties('replication_num' = '1');";
+
+ createTables(tbl1, tbl2, tbl3, tbl4);
}
@Test
@@ -437,4 +445,53 @@ public class PlannerTest extends TestWithFeService {
Assertions.assertThrows(AnalysisException.class, () -> parseAndAnalyzeStmt(createTbl1));
Assertions.assertTrue(exception.getMessage().contains("String Type should not be used in key column[k1]."));
}
+
+ @Test
+ public void testPushDownPredicateOnGroupingSetAggregate() throws Exception {
+ String sql = "explain select k1, k2, count(distinct v1) from db1.tbl4"
+ + " group by grouping sets((k1), (k1, k2)) having k1 = 1 and k2 = 1";
+ StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
+ stmtExecutor.execute();
+ Planner planner = stmtExecutor.planner();
+ List<PlanFragment> fragments = planner.getFragments();
+ String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
+ Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1\n"));
+ }
+
+ @Test
+ public void testPushDownPredicateOnRollupAggregate() throws Exception {
+ String sql = "explain select k1, k2, count(distinct v1) from db1.tbl4"
+ + " group by rollup(k1, k2) having k1 = 1 and k2 = 1";
+ StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
+ stmtExecutor.execute();
+ Planner planner = stmtExecutor.planner();
+ List<PlanFragment> fragments = planner.getFragments();
+ String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
+ Assertions.assertFalse(plan.contains("PREDICATES:"));
+ }
+
+ @Test
+ public void testPushDownPredicateOnNormalAggregate() throws Exception {
+ String sql = "explain select k1, k2, count(distinct v1) from db1.tbl4"
+ + " group by k1, k2 having k1 = 1 and k2 = 1";
+ StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
+ stmtExecutor.execute();
+ Planner planner = stmtExecutor.planner();
+ List<PlanFragment> fragments = planner.getFragments();
+ String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
+ Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1, `k2` = 1\n"));
+ }
+
+ @Test
+ public void testPushDownPredicateOnWindowFunction() throws Exception {
+ String sql = "explain select v1, k1,"
+ + " sum(v1) over (partition by k1 order by v1 rows between 1 preceding and 1 following)"
+ + " as 'moving total' from db1.tbl4 where k1 = 1";
+ StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
+ stmtExecutor.execute();
+ Planner planner = stmtExecutor.planner();
+ List<PlanFragment> fragments = planner.getFragments();
+ String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
+ Assertions.assertTrue(plan.contains("PREDICATES: `k1` = 1\n"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org