You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/30 16:08:19 UTC

[doris] 02/04: [opt](Nereids) support set cte shuffle type for each consumer

This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch runtimefilter_multi_send
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 90a89f8f56f5ec1db36ed0dd1fe039381e042f04
Author: morrySnow <mo...@126.com>
AuthorDate: Thu Jun 29 21:30:50 2023 +0800

    [opt](Nereids) support set cte shuffle type for each consumer
---
 .../glue/translator/PhysicalPlanTranslator.java    | 134 ++++++++++-----------
 .../properties/ChildOutputPropertyDeriver.java     |   2 +-
 .../properties/ChildrenPropertiesRegulator.java    |  34 ++++++
 .../properties/DistributionSpecMustShuffle.java    |  35 ++++++
 .../nereids/properties/PhysicalProperties.java     |   3 +
 .../org/apache/doris/planner/DataStreamSink.java   |  86 ++++++++++++-
 .../org/apache/doris/planner/ExchangeNode.java     |  10 +-
 .../doris/planner/MultiCastPlanFragment.java       |   5 +-
 .../org/apache/doris/planner/PlanFragment.java     |   2 +-
 .../java/org/apache/doris/planner/PlanNode.java    |   4 +-
 10 files changed, 229 insertions(+), 86 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6aff68e6b4..5c4863a5b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -247,27 +247,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
     @Override
     public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute,
             PlanTranslatorContext context) {
-        PlanFragment childFragment = distribute.child().accept(this, context);
+        PlanFragment inputFragment = distribute.child().accept(this, context);
         // TODO: why need set streaming here? should remove this.
-        if (childFragment.getPlanRoot() instanceof AggregationNode
+        if (inputFragment.getPlanRoot() instanceof AggregationNode
                 && distribute.child() instanceof PhysicalHashAggregate
-                && context.getFirstAggregateInFragment(childFragment) == distribute.child()) {
+                && context.getFirstAggregateInFragment(inputFragment) == distribute.child()) {
             PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) distribute.child();
             if (hashAggregate.getAggPhase() == AggPhase.LOCAL
                     && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) {
-                AggregationNode aggregationNode = (AggregationNode) childFragment.getPlanRoot();
+                AggregationNode aggregationNode = (AggregationNode) inputFragment.getPlanRoot();
                 aggregationNode.setUseStreamingPreagg(hashAggregate.isMaybeUsingStream());
             }
         }
 
-        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), childFragment.getPlanRoot());
+        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), inputFragment.getPlanRoot());
         updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute);
-        exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances());
-        if (distribute.getDistributionSpec() instanceof DistributionSpecGather) {
-            // gather to one instance
-            exchangeNode.setNumInstances(1);
-        }
-
         List<ExprId> validOutputIds = distribute.getOutputExprIds();
         if (distribute.child() instanceof PhysicalHashAggregate) {
             // we must add group by keys to output list,
@@ -282,8 +276,28 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         }
         DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
         PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
-        childFragment.setDestination(exchangeNode);
-        childFragment.setOutputPartition(dataPartition);
+        exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
+        if (distribute.getDistributionSpec() instanceof DistributionSpecGather) {
+            // gather to one instance
+            exchangeNode.setNumInstances(1);
+        }
+
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
+            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context);
+            exchangeNode.updateTupleIds(tupleDescriptor);
+            dataStreamSink.setExchNodeId(exchangeNode.getId());
+            dataStreamSink.setOutputPartition(dataPartition);
+            parentFragment.addChild(inputFragment);
+            ((MultiCastPlanFragment) inputFragment).addToDest(exchangeNode);
+        } else {
+            inputFragment.setDestination(exchangeNode);
+            inputFragment.setOutputPartition(dataPartition);
+        }
+
         context.addPlanFragment(parentFragment);
         return parentFragment;
     }
@@ -763,68 +777,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         PhysicalCTEProducer cteProducer = context.getCteProduceMap().get(cteId);
         Preconditions.checkState(cteProducer != null, "invalid cteProducer");
 
-        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), multiCastFragment.getPlanRoot());
-
-        DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId());
-        streamSink.setPartition(DataPartition.RANDOM);
+        // set datasink to multicast data sink but do not set target now
+        // target will be set when translate distribute
+        DataStreamSink streamSink = new DataStreamSink();
         streamSink.setFragment(multiCastFragment);
-
         multiCastDataSink.getDataStreamSinks().add(streamSink);
         multiCastDataSink.getDestinations().add(Lists.newArrayList());
 
-        exchangeNode.setNumInstances(multiCastFragment.getPlanRoot().getNumInstances());
-
-        PlanFragment consumeFragment = new PlanFragment(context.nextFragmentId(), exchangeNode,
-                multiCastFragment.getDataPartition());
-
-        Map<Slot, Slot> projectMap = Maps.newHashMap();
-        projectMap.putAll(cteConsumer.getProducerToConsumerSlotMap());
-
-        List<NamedExpression> execList = new ArrayList<>();
-        PlanNode inputPlanNode = consumeFragment.getPlanRoot();
-        List<Slot> cteProjects = cteProducer.getProjects();
-        for (Slot slot : cteProjects) {
-            if (projectMap.containsKey(slot)) {
-                execList.add(projectMap.get(slot));
-            } else {
-                throw new RuntimeException("could not find slot in cte producer consumer projectMap");
-            }
+        // update expr to slot mapping
+        for (int i = 0; i < cteConsumer.getOutput().size(); i++) {
+            Slot producerSlot = cteProducer.getOutput().get(i);
+            Slot consumerSlot = cteConsumer.getOutput().get(i);
+            SlotRef slotRef = context.findSlotRef(producerSlot.getExprId());
+            context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
         }
-
-        List<Slot> slotList = execList
-                .stream()
-                .map(NamedExpression::toSlot)
-                .collect(Collectors.toList());
-
-        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
-
-        // update tuple list and tblTupleList
-        inputPlanNode.getTupleIds().clear();
-        inputPlanNode.getTupleIds().add(tupleDescriptor.getId());
-        inputPlanNode.getTblRefIds().clear();
-        inputPlanNode.getTblRefIds().add(tupleDescriptor.getId());
-        inputPlanNode.getNullableTupleIds().clear();
-        inputPlanNode.getNullableTupleIds().add(tupleDescriptor.getId());
-
-        List<Expr> execExprList = execList
-                .stream()
-                .map(e -> ExpressionTranslator.translate(e, context))
-                .collect(Collectors.toList());
-
-        inputPlanNode.setProjectList(execExprList);
-        inputPlanNode.setOutputTupleDesc(tupleDescriptor);
-
-        // update data partition
-        consumeFragment.setDataPartition(DataPartition.RANDOM);
-
-        SelectNode projectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode);
-        consumeFragment.setPlanRoot(projectNode);
-
-        multiCastFragment.getDestNodeList().add(exchangeNode);
-        consumeFragment.addChild(multiCastFragment);
-        context.getPlanFragments().add(consumeFragment);
-
-        return consumeFragment;
+        return multiCastFragment;
     }
 
     @Override
@@ -859,6 +826,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         }
         PlanFragment inputFragment = filter.child(0).accept(this, context);
 
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
+            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            filter.getConjuncts().stream()
+                    .map(e -> ExpressionTranslator.translate(e, context))
+                    .forEach(dataStreamSink::addConjunct);
+            return inputFragment;
+        }
+
         PlanNode planNode = inputFragment.getPlanRoot();
         if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) {
             // the three nodes don't support conjuncts, need create a SelectNode to filter data
@@ -1397,19 +1375,31 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                 ((AbstractPhysicalJoin<?, ?>) project.child(0).child(0)).setShouldTranslateOutput(false);
             }
         }
+
         PlanFragment inputFragment = project.child(0).accept(this, context);
+
         List<Expr> execExprList = project.getProjects()
                 .stream()
                 .map(e -> ExpressionTranslator.translate(e, context))
                 .collect(Collectors.toList());
         // TODO: fix the project alias of an aliased relation.
-
-        PlanNode inputPlanNode = inputFragment.getPlanRoot();
         List<Slot> slotList = project.getProjects()
                 .stream()
                 .map(NamedExpression::toSlot)
                 .collect(Collectors.toList());
 
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
+            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
+            dataStreamSink.setProjections(execExprList);
+            dataStreamSink.setOutputTupleDesc(tupleDescriptor);
+            return inputFragment;
+        }
+
+        PlanNode inputPlanNode = inputFragment.getPlanRoot();
         List<Expr> predicateList = inputPlanNode.getConjuncts();
         Set<SlotId> requiredSlotIdSet = Sets.newHashSet();
         for (Expr expr : execExprList) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 2695b67a93..158eaa67f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -114,7 +114,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
     public PhysicalProperties visitPhysicalCTEConsumer(
             PhysicalCTEConsumer cteConsumer, PlanContext context) {
         Preconditions.checkState(childrenOutputProperties.size() == 0);
-        return PhysicalProperties.ANY;
+        return PhysicalProperties.MUST_SHUFFLE;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 862dbb5e2b..86a3f650d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -27,9 +27,11 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.plans.AggMode;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -74,16 +76,33 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
 
     @Override
     public Boolean visit(Plan plan, Void context) {
+        // process must shuffle
+        for (int i = 0; i < children.size(); i++) {
+            DistributionSpec distributionSpec = childrenProperties.get(i).getDistributionSpec();
+            if (distributionSpec instanceof DistributionSpecMustShuffle) {
+                updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY);
+            }
+        }
         return true;
     }
 
     @Override
     public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> agg, Void context) {
+        // forbid one phase agg on distribute
         if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
                 && children.get(0).getPlan() instanceof PhysicalDistribute) {
             // this means one stage gather agg, usually bad pattern
             return false;
         }
+        // process must shuffle
+        visit(agg, context);
+        // process agg
+        return true;
+    }
+
+    @Override
+    public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void context) {
+        // do not process must shuffle
         return true;
     }
 
@@ -93,6 +112,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
         Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size()));
         Preconditions.checkArgument(childrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
+        // process must shuffle
+        visit(hashJoin, context);
+        // process hash join
         DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec();
         DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec();
 
@@ -229,6 +251,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
         Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size()));
         Preconditions.checkArgument(childrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
+        // process must shuffle
+        visit(nestedLoopJoin, context);
+        // process nlj
         DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec();
         if (rightDistributionSpec instanceof DistributionSpecStorageGather) {
             updateChildEnforceAndCost(1, PhysicalProperties.GATHER);
@@ -236,8 +261,17 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
         return true;
     }
 
+    @Override
+    public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> project, Void context) {
+        // do not process must shuffle
+        return true;
+    }
+
     @Override
     public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) {
+        // process must shuffle
+        visit(setOperation, context);
+        // process set operation
         if (children.isEmpty()) {
             return true;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java
new file mode 100644
index 0000000000..8f718fbb9d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+/**
+ * present data must use after shuffle
+ */
+public class DistributionSpecMustShuffle extends DistributionSpec {
+
+    public static final DistributionSpecMustShuffle INSTANCE = new DistributionSpecMustShuffle();
+
+    public DistributionSpecMustShuffle() {
+        super();
+    }
+
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        return other instanceof DistributionSpecAny;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 28bf347977..9596043f67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -44,6 +44,9 @@ public class PhysicalProperties {
 
     public static PhysicalProperties STORAGE_GATHER = new PhysicalProperties(DistributionSpecStorageGather.INSTANCE);
 
+    public static PhysicalProperties MUST_SHUFFLE = new PhysicalProperties(DistributionSpecMustShuffle.INSTANCE);
+
+
     private final OrderSpec orderSpec;
 
     private final DistributionSpec distributionSpec;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index 0f903d69d1..274cdd86f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -20,19 +20,38 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.BitmapFilterPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDataStreamSink;
 import org.apache.doris.thrift.TExplainLevel;
 
+import com.google.common.collect.Lists;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
 /**
  * Data sink that forwards data to an exchange node.
  */
 public class DataStreamSink extends DataSink {
-    private final PlanNodeId exchNodeId;
+
+    private PlanNodeId exchNodeId;
 
     private DataPartition outputPartition;
 
+    protected TupleDescriptor outputTupleDesc;
+
+    protected List<Expr> projections;
+
+    protected List<Expr> conjuncts = Lists.newArrayList();
+
+    public DataStreamSink() {
+
+    }
+
     public DataStreamSink(PlanNodeId exchNodeId) {
         this.exchNodeId = exchNodeId;
     }
@@ -42,23 +61,65 @@ public class DataStreamSink extends DataSink {
         return exchNodeId;
     }
 
+    public void setExchNodeId(PlanNodeId exchNodeId) {
+        this.exchNodeId = exchNodeId;
+    }
+
     @Override
     public DataPartition getOutputPartition() {
         return outputPartition;
     }
 
-    public void setPartition(DataPartition partition) {
-        outputPartition = partition;
+    public void setOutputPartition(DataPartition outputPartition) {
+        this.outputPartition = outputPartition;
+    }
+
+    public TupleDescriptor getOutputTupleDesc() {
+        return outputTupleDesc;
+    }
+
+    public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) {
+        this.outputTupleDesc = outputTupleDesc;
+    }
+
+    public List<Expr> getProjections() {
+        return projections;
+    }
+
+    public void setProjections(List<Expr> projections) {
+        this.projections = projections;
+    }
+
+    public List<Expr> getConjuncts() {
+        return conjuncts;
+    }
+
+    public void setConjuncts(List<Expr> conjuncts) {
+        this.conjuncts = conjuncts;
+    }
+
+    public void addConjunct(Expr conjunct) {
+        this.conjuncts.add(conjunct);
     }
 
     @Override
     public String getExplainString(String prefix, TExplainLevel explainLevel) {
         StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append(prefix + "STREAM DATA SINK\n");
-        strBuilder.append(prefix + "  EXCHANGE ID: " + exchNodeId + "\n");
+        strBuilder.append(prefix).append("STREAM DATA SINK\n");
+        strBuilder.append(prefix).append("  EXCHANGE ID: ").append(exchNodeId);
         if (outputPartition != null) {
-            strBuilder.append(prefix + "  " + outputPartition.getExplainString(explainLevel));
+            strBuilder.append("\n").append(prefix).append("  ").append(outputPartition.getExplainString(explainLevel));
+        }
+        if (!conjuncts.isEmpty()) {
+            Expr expr = PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts);
+            strBuilder.append(prefix).append("  CONJUNCTS: ").append(expr.toSql());
+        }
+        if (!CollectionUtils.isEmpty(projections)) {
+            strBuilder.append("\n").append(prefix).append("  PROJECTIONS: ")
+                    .append(PlanNode.getExplainString(projections)).append("\n");
+            strBuilder.append(prefix).append("  PROJECTION TUPLE: ").append(outputTupleDesc.getId());
         }
+        strBuilder.append("\n");
         return strBuilder.toString();
     }
 
@@ -67,6 +128,19 @@ public class DataStreamSink extends DataSink {
         TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK);
         TDataStreamSink tStreamSink =
                 new TDataStreamSink(exchNodeId.asInt(), outputPartition.toThrift());
+        for (Expr e : conjuncts) {
+            if  (!(e instanceof BitmapFilterPredicate)) {
+                tStreamSink.addToConjuncts(e.treeToThrift());
+            }
+        }
+        if (projections != null) {
+            for (Expr expr : projections) {
+                tStreamSink.addToOutputExprs(expr.treeToThrift());
+            }
+        }
+        if (outputTupleDesc != null) {
+            tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt());
+        }
         result.setStreamSink(tStreamSink);
         return result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 30bf9eb45d..6694a05219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -108,15 +108,21 @@ public class ExchangeNode extends PlanNode {
     public final void computeTupleIds() {
         PlanNode inputNode = getChild(0);
         TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
+        updateTupleIds(outputTupleDesc);
+    }
+
+    public void updateTupleIds(TupleDescriptor outputTupleDesc) {
         if (outputTupleDesc != null) {
             tupleIds.clear();
             tupleIds.add(outputTupleDesc.getId());
+            tblRefIds.add(outputTupleDesc.getId());
+            nullableTupleIds.add(outputTupleDesc.getId());
         } else {
             clearTupleIds();
             tupleIds.addAll(getChild(0).getTupleIds());
+            tblRefIds.addAll(getChild(0).getTblRefIds());
+            nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
         }
-        tblRefIds.addAll(getChild(0).getTblRefIds());
-        nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java
index 0d5b54b269..e52bbb0557 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java
@@ -35,10 +35,11 @@ public class MultiCastPlanFragment extends PlanFragment {
         this.children.addAll(planFragment.getChildren());
     }
 
-    public List<ExchangeNode> getDestNodeList() {
-        return destNodeList;
+    public void addToDest(ExchangeNode exchangeNode) {
+        destNodeList.add(exchangeNode);
     }
 
+
     public List<PlanFragment> getDestFragmentList() {
         return destNodeList.stream().map(PlanNode::getFragment).collect(Collectors.toList());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index 54903beae5..64ac4c3051 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -256,7 +256,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
             Preconditions.checkState(sink == null);
             // we're streaming to an exchange node
             DataStreamSink streamSink = new DataStreamSink(destNode.getId());
-            streamSink.setPartition(outputPartition);
+            streamSink.setOutputPartition(outputPartition);
             streamSink.setFragment(this);
             sink = streamSink;
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index ab17c0acec..cf5d14cb5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -415,7 +415,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         }
     }
 
-    protected Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) {
+    public static Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) {
         List<Expr> targetConjuncts = Lists.newArrayList(conjuncts);
         while (targetConjuncts.size() > 1) {
             List<Expr> newTargetConjuncts = Lists.newArrayList();
@@ -824,7 +824,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         return output.toString();
     }
 
-    protected String getExplainString(List<? extends Expr> exprs) {
+    public static String getExplainString(List<? extends Expr> exprs) {
         if (exprs == null) {
             return "";
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org