You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "xzj7019 (via GitHub)" <gi...@apache.org> on 2023/06/27 14:57:51 UTC

[GitHub] [doris] xzj7019 commented on a diff in pull request #21114: [improvement](nereids) Support rf into cte

xzj7019 commented on code in PR #21114:
URL: https://github.com/apache/doris/pull/21114#discussion_r1243898489


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring equal set, which
+                        // indicating actually the same runtime filter build side
+                        List<Expression> conditions = curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) ((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) ((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (inSameEqualSet) {
+                            ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            PhysicalCTEProducer cteProducer = entry.getKey();
+            if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
+                continue;
+            }
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
+            int exprOrder = 0;
+            for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
+                EqualTo equalTo = innerEntry.getKey();
+                PhysicalHashJoin join = innerEntry.getValue();
+                Preconditions.checkState(cteProducer != null && join != null);
+                TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
+                if (ctx.getSessionVariable().enablePipelineEngine()) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                        equalTo, join.child(0).getOutputSet()));
+                doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, exprOrder++, cteProducer);
+            }
+            ctx.getPushedDownCTE().add(cteProducer.getCteId());
+        }
+    }
+
+    private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder,
+            PhysicalCTEProducer cteProducer) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+
+        Preconditions.checkState(inputPlanNode != null);

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring equal set, which
+                        // indicating actually the same runtime filter build side
+                        List<Expression> conditions = curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) ((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) ((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (inSameEqualSet) {
+                            ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            PhysicalCTEProducer cteProducer = entry.getKey();
+            if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
+                continue;
+            }
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
+            int exprOrder = 0;
+            for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
+                EqualTo equalTo = innerEntry.getKey();
+                PhysicalHashJoin join = innerEntry.getValue();
+                Preconditions.checkState(cteProducer != null && join != null);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org