You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "morrySnow (via GitHub)" <gi...@apache.org> on 2023/06/27 13:22:56 UTC

[GitHub] [doris] morrySnow commented on a diff in pull request #21114: [improvement](nereids) Support rf into cte

morrySnow commented on code in PR #21114:
URL: https://github.com/apache/doris/pull/21114#discussion_r1243387805


##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java:
##########
@@ -162,6 +163,19 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
         setFragmentInPlanTree(planRoot);
     }
 
+    public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition,

Review Comment:
   use this(...) in ctor
   ```java
   public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition)
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java:
##########
@@ -77,12 +79,25 @@ public PhysicalCTEConsumer(CTEId cteId, Map<Slot, Slot> consumerToProducerSlotMa
                                LogicalProperties logicalProperties,
                                PhysicalProperties physicalProperties,
                                Statistics statistics) {
-        super(PlanType.PHYSICAL_CTE_CONSUME, groupExpression, logicalProperties, physicalProperties, statistics);
+        super(RelationUtil.newRelationId(), PlanType.PHYSICAL_CTE_CONSUME, ImmutableList.of(), groupExpression,
+                logicalProperties, physicalProperties, statistics);
         this.cteId = cteId;
         this.consumerToProducerSlotMap = ImmutableMap.copyOf(consumerToProducerSlotMap);
         this.producerToConsumerSlotMap = ImmutableMap.copyOf(producerToConsumerSlotMap);
     }
 
+    @Override
+    public OlapTable getTable() {
+        Preconditions.checkState(false);

Review Comment:
   add check msg, or throw exception derictly



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java:
##########
@@ -59,6 +62,10 @@ public class RuntimeFilterContext {
 
     private final Map<Plan, List<ExprId>> joinToTargetExprId = Maps.newHashMap();
 
+    private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = Maps.newHashMap();

Review Comment:
   Put all the cte related variables together



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2114,6 +2116,9 @@ public PlanFragment visitPhysicalCTEProducer(PhysicalCTEProducer<? extends Plan>
         cteProduce.setOutputExprs(outputs);
         context.getCteProduceFragments().put(cteId, cteProduce);
         context.getCteProduceMap().put(cteId, cteProducer);
+        if (context.getRuntimeTranslator().isPresent()) {
+            context.getRuntimeTranslator().get().getContext().getCteProduceMap().put(cteId, cteProducer);
+        }

Review Comment:
   i think this is useless, because getCteProduceMap is already generated in RFGenerator



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {

Review Comment:
   code is not match comment: no non-equal join condition
   and why we need this restriction? plz add comment to explain 



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently

Review Comment:
   why we need this restriction? if we will let go of this restriction, add a TODO comment and explain why need this and how to do next
   



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);

Review Comment:
   i think this statement could move into the upper `if`, because we only add current join into CteToJoinsMap in the upper `if`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java:
##########
@@ -77,12 +79,25 @@ public PhysicalCTEConsumer(CTEId cteId, Map<Slot, Slot> consumerToProducerSlotMa
                                LogicalProperties logicalProperties,
                                PhysicalProperties physicalProperties,
                                Statistics statistics) {
-        super(PlanType.PHYSICAL_CTE_CONSUME, groupExpression, logicalProperties, physicalProperties, statistics);
+        super(RelationUtil.newRelationId(), PlanType.PHYSICAL_CTE_CONSUME, ImmutableList.of(), groupExpression,
+                logicalProperties, physicalProperties, statistics);
         this.cteId = cteId;
         this.consumerToProducerSlotMap = ImmutableMap.copyOf(consumerToProducerSlotMap);
         this.producerToConsumerSlotMap = ImmutableMap.copyOf(producerToConsumerSlotMap);
     }
 
+    @Override
+    public OlapTable getTable() {
+        Preconditions.checkState(false);
+        return null;
+    }
+
+    @Override
+    public List<String> getQualifier() {
+        Preconditions.checkState(false);

Review Comment:
   add check msg, or throw exception derictly



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;

Review Comment:
   return directly is better?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {

Review Comment:
   could we just remove entry from RFPushDownMap to avoid use another ctx attr `PushedDownCTE`?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -94,122 +113,30 @@ public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? ext
         Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
         join.right().accept(this, context);
         join.left().accept(this, context);
+        collectPushDownCTEInfos(join, context);

Review Comment:
   i think we should not collect push down cte info, when current join node satisfy `DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring equal set, which
+                        // indicating actually the same runtime filter build side
+                        List<Expression> conditions = curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) ((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) ((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (inSameEqualSet) {
+                            ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            PhysicalCTEProducer cteProducer = entry.getKey();
+            if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
+                continue;
+            }
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
+            int exprOrder = 0;
+            for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
+                EqualTo equalTo = innerEntry.getKey();
+                PhysicalHashJoin join = innerEntry.getValue();
+                Preconditions.checkState(cteProducer != null && join != null);

Review Comment:
   already call `cteProducer.getCteId()` before check `cteProducer != null`, so `cteProducer != null` will always true



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring equal set, which
+                        // indicating actually the same runtime filter build side
+                        List<Expression> conditions = curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) ((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) ((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (inSameEqualSet) {
+                            ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            PhysicalCTEProducer cteProducer = entry.getKey();
+            if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
+                continue;
+            }
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
+            int exprOrder = 0;
+            for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
+                EqualTo equalTo = innerEntry.getKey();
+                PhysicalHashJoin join = innerEntry.getValue();
+                Preconditions.checkState(cteProducer != null && join != null);
+                TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
+                if (ctx.getSessionVariable().enablePipelineEngine()) {
+                    type = TRuntimeFilterType.BLOOM;
+                }

Review Comment:
   add some comment to explain why change to BLOOM



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring equal set, which
+                        // indicating actually the same runtime filter build side
+                        List<Expression> conditions = curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) ((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) ((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (inSameEqualSet) {
+                            ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            PhysicalCTEProducer cteProducer = entry.getKey();
+            if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
+                continue;
+            }
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
+            int exprOrder = 0;
+            for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
+                EqualTo equalTo = innerEntry.getKey();
+                PhysicalHashJoin join = innerEntry.getValue();
+                Preconditions.checkState(cteProducer != null && join != null);
+                TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
+                if (ctx.getSessionVariable().enablePipelineEngine()) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                        equalTo, join.child(0).getOutputSet()));
+                doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, exprOrder++, cteProducer);
+            }
+            ctx.getPushedDownCTE().add(cteProducer.getCteId());
+        }
+    }
+
+    private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder,
+            PhysicalCTEProducer cteProducer) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+
+        Preconditions.checkState(inputPlanNode != null);

Review Comment:
   not need to do this check



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2087,7 +2088,8 @@ public PlanFragment visitPhysicalCTEConsumer(PhysicalCTEConsumer cteConsumer,
         DataPartition dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, execExprList);
         consumeFragment.setDataPartition(dataPartition);
 
-        SelectNode projectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode);
+        PlanNodeId consumerPlanId = context.nextPlanNodeId();
+        SelectNode projectNode = new SelectNode(consumerPlanId, inputPlanNode);

Review Comment:
   useless change



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java:
##########
@@ -59,6 +62,10 @@ public class RuntimeFilterContext {
 
     private final Map<Plan, List<ExprId>> joinToTargetExprId = Maps.newHashMap();
 
+    private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = Maps.newHashMap();

Review Comment:
   add explain to this map and set to explain what they are and where to use



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side

Review Comment:
   what's this mean?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {

Review Comment:
   why only process those node?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {

Review Comment:
   currently, both side of join equalTo condition are SlotReference, so if we need sql must not like `on a = b + 1`, this check cannot process correctly



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size());

Review Comment:
   add check msg



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+        // contains join with denied join type. for example: a left join b on a.id = b.id
+        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column of the same cte
+            // the other side of these join conditions are the same column of the same table, or
+            // they in the same equal sets, such as under an equal join condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring equal set, which
+                        // indicating actually the same runtime filter build side
+                        List<Expression> conditions = curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) ((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) ((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }

Review Comment:
   cannot understand easily, could u offer an example in comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org