You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/18 08:34:39 UTC

[incubator-doris] 04/09: [improment](planner) push down predicate past two phase aggregate (#9498)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit db2d9d160d727775fc92692418d4ea1a17de05af
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Wed May 18 10:09:39 2022 +0800

    [improment](planner) push down predicate past two phase aggregate (#9498)
    
    Push down predicate past aggregate cannot push down predicate past 2 phase aggregate.
    
    origin plan is like this:
    ```
    second phase agg (conjuncts on olap scan node tuples)
    |
    first phase agg
    |
    olap scan node
    ```
    should be optimized to
    ```
    second phase agg
    |
    first phase agg
    |
    olap scan node (conjuncts on olap scan node tuples)
    ```
---
 .../apache/doris/planner/SingleNodePlanner.java    | 299 ++++++++++++---------
 .../java/org/apache/doris/planner/PlannerTest.java |   1 -
 2 files changed, 176 insertions(+), 124 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index b4ab81b17d..0f92c4d803 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -25,7 +25,6 @@ import org.apache.doris.analysis.BaseTableRef;
 import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.CaseExpr;
 import org.apache.doris.analysis.CastExpr;
-import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.FunctionCallExpr;
@@ -68,6 +67,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -88,18 +88,18 @@ import java.util.stream.Collectors;
  * The single-node plan needs to be wrapped in a plan fragment for it to be executable.
  */
 public class SingleNodePlanner {
-    private final static Logger LOG = LogManager.getLogger(SingleNodePlanner.class);
+    private static final Logger LOG = LogManager.getLogger(SingleNodePlanner.class);
 
-    private final PlannerContext ctx_;
+    private final PlannerContext ctx;
     private final ArrayList<ScanNode> scanNodes = Lists.newArrayList();
     private Map<Analyzer, List<ScanNode>> selectStmtToScanNodes = Maps.newHashMap();
 
     public SingleNodePlanner(PlannerContext ctx) {
-        ctx_ = ctx;
+        this.ctx = ctx;
     }
 
     public PlannerContext getPlannerContext() {
-        return ctx_;
+        return ctx;
     }
 
     public ArrayList<ScanNode> getScanNodes() {
@@ -137,7 +137,7 @@ public class SingleNodePlanner {
      * re-maps its input, set a substitution map to be applied by parents.
      */
     public PlanNode createSingleNodePlan() throws UserException, AnalysisException {
-        QueryStmt queryStmt = ctx_.getQueryStmt();
+        QueryStmt queryStmt = ctx.getQueryStmt();
         // Use the stmt's analyzer which is not necessarily the root analyzer
         // to detect empty result sets.
         Analyzer analyzer = queryStmt.getAnalyzer();
@@ -162,7 +162,7 @@ public class SingleNodePlanner {
             LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
         }
         PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
-                ctx_.getQueryOptions().getDefaultOrderByLimit());
+                ctx.getQueryOptions().getDefaultOrderByLimit());
         Preconditions.checkNotNull(singleNodePlan);
         return singleNodePlan;
     }
@@ -187,7 +187,7 @@ public class SingleNodePlanner {
             tupleIds.add(createResultTupleDescriptor(selectStmt, "empty", analyzer).getId());
         }
         unmarkCollectionSlots(stmt);
-        EmptySetNode node = new EmptySetNode(ctx_.getNextNodeId(), tupleIds);
+        EmptySetNode node = new EmptySetNode(ctx.getNextNodeId(), tupleIds);
         node.init(analyzer);
         // Set the output smap to resolve exprs referencing inline views within stmt.
         // Not needed for a UnionStmt because it materializes its input operands.
@@ -238,7 +238,7 @@ public class SingleNodePlanner {
             // insert possible AnalyticEvalNode before SortNode
             if (selectStmt.getAnalyticInfo() != null) {
                 AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
-                AnalyticPlanner analyticPlanner = new AnalyticPlanner(analyticInfo, analyzer, ctx_);
+                AnalyticPlanner analyticPlanner = new AnalyticPlanner(analyticInfo, analyzer, ctx);
                 List<Expr> inputPartitionExprs = Lists.newArrayList();
                 AggregateInfo aggInfo = selectStmt.getAggInfo();
                 root = analyticPlanner.createSingleNodePlan(root,
@@ -273,7 +273,7 @@ public class SingleNodePlanner {
             if (limit == -1 && analyzer.getContext().getSessionVariable().enableSpilling) {
                 useTopN = false;
             }
-            root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
+            root = new SortNode(ctx.getNextNodeId(), root, stmt.getSortInfo(),
                     useTopN, limit == -1, stmt.getOffset());
             if (useTopN) {
                 root.setLimit(limit != -1 ? limit : newDefaultOrderByLimit);
@@ -325,7 +325,7 @@ public class SingleNodePlanner {
             return root;
         }
         // evaluate conjuncts in SelectNode
-        SelectNode selectNode = new SelectNode(ctx_.getNextNodeId(), root, conjuncts);
+        SelectNode selectNode = new SelectNode(ctx.getNextNodeId(), root, conjuncts);
         selectNode.init(analyzer);
         Preconditions.checkState(selectNode.hasValidStats());
         return selectNode;
@@ -341,15 +341,11 @@ public class SingleNodePlanner {
         // Gather unassigned conjuncts and generate predicates to enfore
         // slot equivalences for each tuple id.
         List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root);
-        for (TupleId tid : tupleIds) {
-            // TODO(zc)
-            // analyzer.createEquivConjuncts(tid, conjuncts);
-        }
         if (conjuncts.isEmpty()) {
             return root;
         }
         // evaluate conjuncts in SelectNode
-        SelectNode selectNode = new SelectNode(ctx_.getNextNodeId(), root, conjuncts);
+        SelectNode selectNode = new SelectNode(ctx.getNextNodeId(), root, conjuncts);
         // init() marks conjuncts as assigned
         selectNode.init(analyzer);
         Preconditions.checkState(selectNode.hasValidStats());
@@ -382,8 +378,8 @@ public class SingleNodePlanner {
                     final JoinOperator joinOperator = selectStmt.getTableRefs().get(i).getJoinOp();
                     // TODO chenhao , right out join ?
                     if (joinOperator.isRightOuterJoin() || joinOperator.isFullOuterJoin()) {
-                        turnOffReason = selectStmt.getTableRefs().get(i) +
-                                " joinOp is full outer join or right outer join.";
+                        turnOffReason = selectStmt.getTableRefs().get(i)
+                                + " joinOp is full outer join or right outer join.";
                         aggTableValidate = false;
                         break;
                     }
@@ -442,8 +438,8 @@ public class SingleNodePlanner {
                 for (SlotDescriptor slot : selectStmt.getTableRefs().get(0).getDesc().getSlots()) {
                     if (!slot.getColumn().isKey()) {
                         if (conjunctSlotIds.contains(slot.getId())) {
-                            turnOffReason = "conjunct on `" + slot.getColumn().getName() +
-                                    "` which is StorageEngine value column";
+                            turnOffReason = "conjunct on `" + slot.getColumn().getName()
+                                    + "` which is StorageEngine value column";
                             valueColumnValidate = false;
                             break;
                         }
@@ -471,7 +467,9 @@ public class SingleNodePlanner {
                                 && child.getChild(0).getType().isNumericType()) {
                             returnColumns.add(((SlotRef) child.getChild(0)).getDesc().getColumn());
                         } else {
-                            turnOffReason = "aggExpr.getChild(0)[" + aggExpr.getChild(0).toSql() + "] is not Numeric CastExpr";
+                            turnOffReason = "aggExpr.getChild(0)["
+                                    + aggExpr.getChild(0).toSql()
+                                    + "] is not Numeric CastExpr";
                             aggExprValidate = false;
                             break;
                         }
@@ -484,8 +482,6 @@ public class SingleNodePlanner {
                             conditionExpr.getIds(conditionTupleIds, conditionSlotIds);
 
                             for (SlotId conditionSlotId : conditionSlotIds) {
-                                DescriptorTable descTable = analyzer.getDescTbl();
-                                SlotDescriptor slotDesc = descTable.getSlotDesc(conditionSlotId);
                                 conditionColumns.add(analyzer.getDescTbl().getSlotDesc(conditionSlotId).getColumn());
                             }
                         }
@@ -677,7 +673,8 @@ public class SingleNodePlanner {
      * required tuple ids of one or more TableRefs in subplanRefs are materialized
      * Returns null if we can't create an executable plan.
      */
-    private PlanNode createCheapestJoinPlan(Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans) throws UserException {
+    private PlanNode createCheapestJoinPlan(Analyzer analyzer,
+            List<Pair<TableRef, PlanNode>> refPlans) throws UserException {
         if (refPlans.size() == 1) {
             return refPlans.get(0).second;
         }
@@ -729,7 +726,9 @@ public class SingleNodePlanner {
 
         for (Pair<TableRef, Long> candidate : candidates) {
             PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
-            if (result != null) return result;
+            if (result != null) {
+                return result;
+            }
         }
         return null;
     }
@@ -780,16 +779,15 @@ public class SingleNodePlanner {
         // join in the original query still remain to the left/right after join ordering.
         // This prevents join re-ordering across outer/semi joins which is generally wrong.
 
-        /**
-         * Key of precedingRefs: the right table ref of outer or semi join
-         * Value of precedingRefs: the preceding refs of this key
-         * For example:
-         * select * from t1, t2, t3 left join t4, t5, t6 right semi join t7, t8, t9
-         * Map:
-         * { t4: [t1, t2, t3],
-         *   t7: [t1, t2, t3, t4, t5, t6]
-         * }
-         */
+
+        // Key of precedingRefs: the right table ref of outer or semi join
+        // Value of precedingRefs: the preceding refs of this key
+        // For example:
+        // select * from t1, t2, t3 left join t4, t5, t6 right semi join t7, t8, t9
+        // Map:
+        // { t4: [t1, t2, t3],
+        //   t7: [t1, t2, t3, t4, t5, t6]
+        // }
         Map<TableRef, Set<TableRef>> precedingRefs = new HashMap<>();
         List<TableRef> tmpTblRefs = new ArrayList<>();
         for (Pair<TableRef, PlanNode> entry : refPlans) {
@@ -824,21 +822,20 @@ public class SingleNodePlanner {
                 if (requiredRefs != null) {
                     Preconditions.checkState(joinOp.isOuterJoin()
                             || joinOp.isSemiJoin());
-                    /**
-                     * The semi and outer join nodes are similar to the stop nodes in each round of the algorithm.
-                     * If the stop node is encountered during the current round of optimal selection,
-                     * it means that the following nodes do not need to be referred to.
-                     * This round has been completed.
-                     * There are two situation in here.
-                     * Situation 1: required table refs have not been placed yet
-                     * t1, t2, t3 left join t4, t5
-                     *     Round 1: t3, t1(new root) meets t4(stop)
-                     *              stop this round and begin next round
-                     * Situation 2: the remaining table refs to prevent incorrect re-ordering of tables across outer/semi joins
-                     *     Round 1: t5, t1, t2, t3(root) meets t4(stop)
-                     *              stop this round while the new root is null
-                     *              planning failed and return null
-                     */
+                    // The semi and outer join nodes are similar to the stop nodes in each round of the algorithm.
+                    // If the stop node is encountered during the current round of optimal selection,
+                    // it means that the following nodes do not need to be referred to.
+                    // This round has been completed.
+                    // There are two situation in here.
+                    // Situation 1: required table refs have not been placed yet
+                    // t1, t2, t3 left join t4, t5
+                    //     Round 1: t3, t1(new root) meets t4(stop)
+                    //              stop this round and begin next round
+                    // Situation 2: the remaining table refs to prevent incorrect re-ordering
+                    //              of tables across outer/semi joins
+                    //     Round 1: t5, t1, t2, t3(root) meets t4(stop)
+                    //              stop this round while the new root is null
+                    //              planning failed and return null
                     if (!requiredRefs.equals(joinedRefs)) {
                         break;
                     }
@@ -872,14 +869,16 @@ public class SingleNodePlanner {
 
                 // Always prefer Hash Join over Nested-Loop Join due to limited costing
                 // infrastructure.
-                /**
-                 * The following three conditions are met while the candidate is better.
-                 * 1. The first candidate
-                 * 2. The candidate is better than new root: [t3, t2] pk [t3, t1] => [t3, t1]
-                 * 3. The hash join is better than cross join: [t3 cross t1] pk [t3 hash t2] => t3 hash t2
-                 */
+                //
+                // The following three conditions are met while the candidate is better.
+                // 1. The first candidate
+                // 2. The candidate is better than new root: [t3, t2] pk [t3, t1] => [t3, t1]
+                // 3. The hash join is better than cross join: [t3 cross t1] pk [t3 hash t2] => t3 hash t2
                 if (newRoot == null
-                        || ((candidate.getClass().equals(newRoot.getClass()) && candidateCardinalityIsSmaller(candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(), newRoot, newRootRightChildCardinality)))
+                        || ((candidate.getClass().equals(newRoot.getClass())
+                        && candidateCardinalityIsSmaller(
+                                candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(),
+                                newRoot, newRootRightChildCardinality)))
                         || (candidate instanceof HashJoinNode && newRoot instanceof CrossJoinNode)) {
                     newRoot = candidate;
                     minEntry = tblRefToPlanNodeOfCandidate;
@@ -887,19 +886,18 @@ public class SingleNodePlanner {
                 }
             }
 
-            /**
-             * The table after the outer or semi join is wrongly planned to the front,
-             * causing the first tblRefToPlanNodeOfCandidate (outer or semi tbl ref) in this round of loop to fail and exit the loop.
-             * This means that the current leftmost node must be wrong, and the correct result cannot be planned.
-             *
-             * For example:
-             * Query: t1 left join t2 inner join t3
-             * Input params: t3(left most tbl ref), [t1,t2] (remaining refs)
-             *     Round 1: t3, t1 (joined refs) t2 (remaining refs)
-             *     Round 2: requiredRefs.equals(joinedRefs) is false and break, the newRoot is null
-             * Result: null
-             * The t3 should not appear before t2 so planning is fail
-             */
+            // The table after the outer or semi join is wrongly planned to the front,
+            // causing the first tblRefToPlanNodeOfCandidate (outer or semi tbl ref)
+            // in this round of loop to fail and exit the loop.
+            // This means that the current leftmost node must be wrong, and the correct result cannot be planned.
+            //
+            // For example:
+            // Query: t1 left join t2 inner join t3
+            // Input params: t3(left most tbl ref), [t1,t2] (remaining refs)
+            //     Round 1: t3, t1 (joined refs) t2 (remaining refs)
+            //     Round 2: requiredRefs.equals(joinedRefs) is false and break, the newRoot is null
+            // Result: null
+            // The t3 should not appear before t2 so planning is fail
             if (newRoot == null) {
                 // Could not generate a valid plan.
                 // for example: the biggest table is the last table
@@ -960,7 +958,7 @@ public class SingleNodePlanner {
         }
 
         if (analyzer.hasEmptySpjResultSet() && selectStmt.getAggInfo() != null) {
-            final PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples);
+            final PlanNode emptySetNode = new EmptySetNode(ctx.getNextNodeId(), rowTuples);
             emptySetNode.init(analyzer);
             emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap());
             return createAggregationPlan(selectStmt, analyzer, emptySetNode);
@@ -1018,7 +1016,6 @@ public class SingleNodePlanner {
             }
 
             for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
-                TableRef outerRef = selectStmt.getTableRefs().get(i - 1);
                 TableRef innerRef = selectStmt.getTableRefs().get(i);
                 root = createJoinNode(analyzer, root, innerRef, selectStmt);
                 // Have the build side of a join copy data to a compact representation
@@ -1067,7 +1064,7 @@ public class SingleNodePlanner {
         GroupingInfo groupingInfo = selectStmt.getGroupingInfo();
         Preconditions.checkState(groupByClause != null && groupByClause.isGroupByExtension()
                 && groupingInfo != null);
-        root = new RepeatNode(ctx_.getNextNodeId(), root, groupingInfo, groupByClause);
+        root = new RepeatNode(ctx.getNextNodeId(), root, groupingInfo, groupByClause);
         root.init(analyzer);
         return root;
     }
@@ -1099,7 +1096,8 @@ public class SingleNodePlanner {
                 // select index by the old Rollup selector
                 olapScanNode.selectBestRollupByRollupSelector(analyzer);
                 // select index by the new Materialized selector
-                MaterializedViewSelector.BestIndexInfo bestIndexInfo = materializedViewSelector.selectBestMV(olapScanNode);
+                MaterializedViewSelector.BestIndexInfo bestIndexInfo
+                        = materializedViewSelector.selectBestMV(olapScanNode);
                 if (bestIndexInfo == null) {
                     selectFailed |= true;
                     TupleId tupleId = olapScanNode.getTupleId();
@@ -1135,7 +1133,7 @@ public class SingleNodePlanner {
         // add aggregation, if required
         AggregateInfo aggInfo = selectStmt.getAggInfo();
         // aggInfo.substitueGroupingExpr(analyzer);
-        PlanNode newRoot = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
+        PlanNode newRoot = new AggregationNode(ctx.getNextNodeId(), root, aggInfo);
         newRoot.init(analyzer);
         Preconditions.checkState(newRoot.hasValidStats());
         // if we're computing DISTINCT agg fns, the analyzer already created the
@@ -1144,7 +1142,7 @@ public class SingleNodePlanner {
             ((AggregationNode) newRoot).unsetNeedsFinalize();
             // The output of the 1st phase agg is the 1st phase intermediate.
             ((AggregationNode) newRoot).setIntermediateTuple();
-            newRoot = new AggregationNode(ctx_.getNextNodeId(), newRoot,
+            newRoot = new AggregationNode(ctx.getNextNodeId(), newRoot,
                     aggInfo.getSecondPhaseDistinctAggInfo());
             newRoot.init(analyzer);
             Preconditions.checkState(newRoot.hasValidStats());
@@ -1163,7 +1161,7 @@ public class SingleNodePlanner {
         ArrayList<Expr> resultExprs = selectStmt.getResultExprs();
         // Create tuple descriptor for materialized tuple.
         TupleDescriptor tupleDesc = createResultTupleDescriptor(selectStmt, "union", analyzer);
-        UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tupleDesc.getId());
+        UnionNode unionNode = new UnionNode(ctx.getNextNodeId(), tupleDesc.getId());
 
         // Analysis guarantees that selects without a FROM clause only have constant exprs.
         if (selectStmt.getValueList() != null) {
@@ -1340,7 +1338,7 @@ public class SingleNodePlanner {
                 Preconditions.checkState(inlineViewRef.getMaterializedTupleIds().size() == 1);
                 // we need to materialize all slots of our inline view tuple
                 analyzer.getTupleDesc(inlineViewRef.getId()).materializeSlots();
-                UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(),
+                UnionNode unionNode = new UnionNode(ctx.getNextNodeId(),
                         inlineViewRef.getMaterializedTupleIds().get(0));
                 if (analyzer.hasEmptyResultSet()) {
                     return unionNode;
@@ -1672,33 +1670,33 @@ public class SingleNodePlanner {
 
         switch (tblRef.getTable().getType()) {
             case OLAP:
-                OlapScanNode olapNode = new OlapScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), 
+                OlapScanNode olapNode = new OlapScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
                         "OlapScanNode");
                 olapNode.setForceOpenPreAgg(tblRef.isForcePreAggOpened());
                 scanNode = olapNode;
                 break;
             case ODBC:
-                scanNode = new OdbcScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), (OdbcTable) tblRef.getTable());
+                scanNode = new OdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (OdbcTable) tblRef.getTable());
                 break;
             case MYSQL:
-                scanNode = new MysqlScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), (MysqlTable) tblRef.getTable());
+                scanNode = new MysqlScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (MysqlTable) tblRef.getTable());
                 break;
             case SCHEMA:
-                scanNode = new SchemaScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
+                scanNode = new SchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc());
                 break;
             case BROKER:
-                scanNode = new BrokerScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "BrokerScanNode",
+                scanNode = new BrokerScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "BrokerScanNode",
                         null, -1);
                 break;
             case ELASTICSEARCH:
-                scanNode = new EsScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "EsScanNode");
+                scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode");
                 break;
             case HIVE:
-                scanNode = new HiveScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "HiveScanNode",
+                scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HiveScanNode",
                         null, -1);
                 break;
             case ICEBERG:
-                scanNode = new IcebergScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
+                scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
                         null, -1);
                 break;
             default:
@@ -1710,7 +1708,8 @@ public class SingleNodePlanner {
 
         scanNodes.add(scanNode);
         // now we put the selectStmtToScanNodes's init before the scanNode.init
-        List<ScanNode> scanNodeList = selectStmtToScanNodes.computeIfAbsent(selectStmt.getAnalyzer(), k -> Lists.newArrayList());
+        List<ScanNode> scanNodeList = selectStmtToScanNodes.computeIfAbsent(
+                selectStmt.getAnalyzer(), k -> Lists.newArrayList());
         scanNodeList.add(scanNode);
 
         scanNode.init(analyzer);
@@ -1803,11 +1802,12 @@ public class SingleNodePlanner {
             }
 
             // construct cross join node
-            // LOG.debug("Join between {} and {} requires at least one conjunctive equality predicate between the two tables",
+            // LOG.debug("Join between {} and {} requires at least one conjunctive"
+            //        + " equality predicate between the two tables",
             //        outerRef.getAliasAsName(), innerRef.getAliasAsName());
             // TODO If there are eq join predicates then we should construct a hash join
             CrossJoinNode result =
-                    new CrossJoinNode(ctx_.getNextNodeId(), outer, inner, innerRef);
+                    new CrossJoinNode(ctx.getNextNodeId(), outer, inner, innerRef);
             result.init(analyzer);
             return result;
         }
@@ -1826,7 +1826,7 @@ public class SingleNodePlanner {
         }
 
         HashJoinNode result =
-                new HashJoinNode(ctx_.getNextNodeId(), outer, inner, innerRef, eqJoinConjuncts,
+                new HashJoinNode(ctx.getNextNodeId(), outer, inner, innerRef, eqJoinConjuncts,
                         ojConjuncts);
         result.init(analyzer);
         return result;
@@ -1889,7 +1889,7 @@ public class SingleNodePlanner {
             throws UserException {
         Preconditions.checkNotNull(lateralViewRefs);
         Preconditions.checkState(lateralViewRefs.size() > 0);
-        TableFunctionNode tableFunctionNode = new TableFunctionNode(ctx_.getNextNodeId(), inputNode,
+        TableFunctionNode tableFunctionNode = new TableFunctionNode(ctx.getNextNodeId(), inputNode,
                 lateralViewRefs);
         tableFunctionNode.init(analyzer);
         tableFunctionNode.projectSlots(analyzer, selectStmt);
@@ -1921,15 +1921,15 @@ public class SingleNodePlanner {
         }
         switch (operation) {
             case UNION:
-                setOpNode = new UnionNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(),
+                setOpNode = new UnionNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
                         setOperationStmt.getSetOpsResultExprs(), false);
                 break;
             case INTERSECT:
-                setOpNode = new IntersectNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(),
+                setOpNode = new IntersectNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
                         setOperationStmt.getSetOpsResultExprs(), false);
                 break;
             case EXCEPT:
-                setOpNode = new ExceptNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(),
+                setOpNode = new ExceptNode(ctx.getNextNodeId(), setOperationStmt.getTupleId(),
                         setOperationStmt.getSetOpsResultExprs(), false);
                 break;
             default:
@@ -2122,7 +2122,7 @@ public class SingleNodePlanner {
         if (hasDistinctOps) {
             result = createSetOperationPlan(
                     analyzer, setOperationStmt, distinctOps, result, defaultOrderByLimit);
-            result = new AggregationNode(ctx_.getNextNodeId(), result,
+            result = new AggregationNode(ctx.getNextNodeId(), result,
                     setOperationStmt.getDistinctAggInfo());
             result.init(analyzer);
         }
@@ -2136,7 +2136,7 @@ public class SingleNodePlanner {
 
     private PlanNode createAssertRowCountNode(PlanNode input, AssertNumRowsElement assertNumRowsElement,
                                               Analyzer analyzer) throws UserException {
-        AssertNumRowsNode root = new AssertNumRowsNode(ctx_.getNextNodeId(), input, assertNumRowsElement);
+        AssertNumRowsNode root = new AssertNumRowsNode(ctx.getNextNodeId(), input, assertNumRowsElement);
         root.init(analyzer);
         return root;
     }
@@ -2298,7 +2298,7 @@ public class SingleNodePlanner {
         if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
             return;
         }
-        putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+        putPredicatesOnTargetTupleIds(stmt.getTableRefIds(), analyzer, predicates);
     }
 
     private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
@@ -2319,28 +2319,58 @@ public class SingleNodePlanner {
         if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
             return;
         }
-        putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+        putPredicatesOnTargetTupleIds(stmt.getTableRefIds(), analyzer, predicates);
     }
 
-    private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
-        final AggregateInfo aggregateInfo = stmt.getAggInfo();
-        if (aggregateInfo == null || aggregateInfo.getGroupingExprs().size() <= 0) {
+    /**
+     * Push down predicates past one phase aggregation.
+     *
+     * @param aggregateInfo one phase aggregate info. Either first phase or second phase
+     * @param analyzer current statement's analyzer
+     * @param stmt current stmt
+     * @param targetTupleIds target tuple to register.
+     *                      Table tuple ids when process first phase agg.
+     *                      First aggregate's tuple id when process second phase agg.
+     * @throws AnalysisException throw exception when register predicate to tuple failed
+     */
+    private void pushDownPredicatesPastAggregationOnePhase(AggregateInfo aggregateInfo,
+            Analyzer analyzer, SelectStmt stmt, List<TupleId> targetTupleIds) throws AnalysisException {
+        if (aggregateInfo == null || aggregateInfo.getGroupingExprs().isEmpty()) {
             return;
         }
         final List<Expr> predicates = getBoundPredicates(analyzer, aggregateInfo.getOutputTupleDesc());
-        if (predicates.size() <= 0) {
+        if (predicates.isEmpty()) {
             return;
         }
-
         // Push down predicates to aggregation's child until they are assigned successfully.
         final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer, stmt);
-        if (pushDownPredicates.size() <= 0) {
+        if (CollectionUtils.isEmpty(pushDownPredicates)) {
             return;
         }
-        putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+        putPredicatesOnTargetTupleIds(targetTupleIds, analyzer, pushDownPredicates);
     }
 
-    private List<Expr> getPredicatesBoundedByGroupbysSourceExpr(List<Expr> predicates, Analyzer analyzer, SelectStmt stmt) {
+    /**
+     * Push down predicates past whole aggregate stage. Include first phase and second phase.
+     *
+     * @param analyzer current statement's analyzer
+     * @param stmt current stmt
+     * @throws AnalysisException throw exception when register predicate to tuple failed
+     */
+    private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
+        final AggregateInfo firstPhaseAggInfo = stmt.getAggInfo();
+        if (firstPhaseAggInfo == null) {
+            return;
+        }
+        final AggregateInfo secondPhaseAggInfo = firstPhaseAggInfo.getSecondPhaseDistinctAggInfo();
+
+        final List<TupleId> firstPhaseTupleIds = Lists.newArrayList(firstPhaseAggInfo.getOutputTupleId());
+        pushDownPredicatesPastAggregationOnePhase(secondPhaseAggInfo, analyzer, stmt, firstPhaseTupleIds);
+        pushDownPredicatesPastAggregationOnePhase(firstPhaseAggInfo, analyzer, stmt, stmt.getTableRefIds());
+    }
+
+    private List<Expr> getPredicatesBoundedByGroupbysSourceExpr(
+            List<Expr> predicates, Analyzer analyzer, SelectStmt stmt) {
         final List<Expr> predicatesCanPushDown = Lists.newArrayList();
         for (Expr predicate : predicates) {
             if (predicate.isConstant()) {
@@ -2352,10 +2382,21 @@ public class SingleNodePlanner {
             final List<SlotId> slotIds = Lists.newArrayList();
             predicate.getIds(tupleIds, slotIds);
 
-            boolean isAllSlotReferingGroupBys = true;
+            boolean isAllSlotReferToGroupBys = true;
             for (SlotId slotId : slotIds) {
-                final SlotDescriptor slotDesc = analyzer.getDescTbl().getSlotDesc(slotId);
-                Expr sourceExpr = slotDesc.getSourceExprs().get(0);
+                Expr sourceExpr = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
+                // Every phase in aggregate will wrap expression with SlotRef.
+                // When we process one phase aggregate, we just need to unwrap once.
+                // But when we process 2 phase aggregate, we need to unwrap twice.
+                // So use loop here to adapt to different situations.
+                while (sourceExpr instanceof SlotRef) {
+                    SlotRef slotRef = (SlotRef) sourceExpr;
+                    SlotDescriptor slotDesc = slotRef.getDesc();
+                    if (slotDesc.getSourceExprs().isEmpty()) {
+                        break;
+                    }
+                    sourceExpr = slotDesc.getSourceExprs().get(0);
+                }
                 // if grouping set is given and column is not in all grouping set list
                 // we cannot push the predicate since the column value can be null
                 if (stmt.getGroupByClause() == null) {
@@ -2368,24 +2409,24 @@ public class SingleNodePlanner {
                     // if grouping type is CUBE or ROLLUP will definitely produce null
                     if (stmt.getGroupByClause().getGroupingType() == GroupByClause.GroupingType.CUBE
                             || stmt.getGroupByClause().getGroupingType() == GroupByClause.GroupingType.ROLLUP) {
-                        isAllSlotReferingGroupBys = false;
+                        isAllSlotReferToGroupBys = false;
                     } else {
                         // if grouping type is GROUPING_SETS and the predicate not in all grouping list,
                         // the predicate cannot be push down
                         for (List<Expr> exprs : stmt.getGroupByClause().getGroupingSetList()) {
                             if (!exprs.contains(sourceExpr)) {
-                                isAllSlotReferingGroupBys = false;
+                                isAllSlotReferToGroupBys = false;
                                 break;
                             }
                         }
                     }
                 }
                 if (sourceExpr.getFn() instanceof AggregateFunction) {
-                    isAllSlotReferingGroupBys = false;
+                    isAllSlotReferToGroupBys = false;
                 }
             }
 
-            if (isAllSlotReferingGroupBys) {
+            if (isAllSlotReferToGroupBys) {
                 predicatesCanPushDown.add(predicate);
             }
         }
@@ -2444,16 +2485,22 @@ public class SingleNodePlanner {
         return false;
     }
 
-    // Register predicates with TableRef's tuple id.
-    private void putPredicatesOnFrom(SelectStmt stmt, Analyzer analyzer, List<Expr> predicates)
+    /**
+     * Register predicates on target tuple ids.
+     *
+     * @param analyzer current stmt analyzer
+     * @param predicates predicates try to register
+     * @param tupleIds target tupleIds
+     * @throws AnalysisException throw exception when register failed
+     */
+    private void putPredicatesOnTargetTupleIds(List<TupleId> tupleIds,
+            Analyzer analyzer, List<Expr> predicates)
             throws AnalysisException {
-        final List<TupleId> tableTupleIds = Lists.newArrayList();
-        for (TableRef tableRef : stmt.getTableRefs()) {
-            tableTupleIds.add(tableRef.getId());
+        if (CollectionUtils.isEmpty(tupleIds)) {
+            return;
         }
-
         for (Expr predicate : predicates) {
-            Preconditions.checkArgument(predicate.isBoundByTupleIds(tableTupleIds),
+            Preconditions.checkArgument(predicate.isBoundByTupleIds(tupleIds),
                     "Predicate:" + predicate.toSql() + " can't be assigned to some PlanNode.");
             final List<TupleId> predicateTupleIds = Lists.newArrayList();
             predicate.getIds(predicateTupleIds, null);
@@ -2486,18 +2533,24 @@ public class SingleNodePlanner {
      */
     public static BinaryPredicate getNormalizedEqPred(Expr expr, List<TupleId> lhsTids,
                                                       List<TupleId> rhsTids, Analyzer analyzer) {
-        if (!(expr instanceof BinaryPredicate)) return null;
+        if (!(expr instanceof BinaryPredicate)) {
+            return null;
+        }
         BinaryPredicate pred = (BinaryPredicate) expr;
         if (!pred.getOp().isEquivalence()) {
             return null;
         }
-        if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) return null;
+        if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) {
+            return null;
+        }
 
         // Use the child that contains lhsTids as lhsExpr, for example, A join B on B.k = A.k,
         // where lhsExpr=A.k, rhsExpr=B.k, changed the order, A.k = B.k
         Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids);
         Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids);
-        if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) return null;
+        if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) {
+            return null;
+        }
 
         BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr);
         result.analyzeNoThrow(analyzer);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
index 8548ba689a..bb30785eef 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
@@ -448,5 +448,4 @@ public class PlannerTest {
         expectedEx.expectMessage("String Type should not be used in key column[k1].");
         UtFrameUtils.parseAndAnalyzeStmt(createTbl1, ctx);
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org