You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/29 13:41:19 UTC

[doris] branch runtimefilter_multi_send updated (95f0208ad5 -> f94e358886)

This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a change to branch runtimefilter_multi_send
in repository https://gitbox.apache.org/repos/asf/doris.git


 discard 95f0208ad5 [opt](Nereids) support set cte shuffle type for each consumer
     new f94e358886 [opt](Nereids) support set cte shuffle type for each consumer

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (95f0208ad5)
            \
             N -- N -- N   refs/heads/runtimefilter_multi_send (f94e358886)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/doris/planner/DataStreamSink.java | 14 ++++++++++++++
 1 file changed, 14 insertions(+)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/01: [opt](Nereids) support set cte shuffle type for each consumer

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch runtimefilter_multi_send
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f94e35888621a93e396655e2694c91a66bd72229
Author: morrySnow <mo...@126.com>
AuthorDate: Thu Jun 29 21:30:50 2023 +0800

    [opt](Nereids) support set cte shuffle type for each consumer
---
 .../glue/translator/PhysicalPlanTranslator.java    | 133 ++++++++++-----------
 .../properties/ChildOutputPropertyDeriver.java     |   2 +-
 .../properties/ChildrenPropertiesRegulator.java    |  34 ++++++
 .../properties/DistributionSpecMustShuffle.java    |  35 ++++++
 .../nereids/properties/PhysicalProperties.java     |   3 +
 .../org/apache/doris/planner/DataStreamSink.java   |  86 ++++++++++++-
 .../org/apache/doris/planner/ExchangeNode.java     |  10 +-
 .../org/apache/doris/planner/PlanFragment.java     |   2 +-
 .../java/org/apache/doris/planner/PlanNode.java    |   4 +-
 9 files changed, 225 insertions(+), 84 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 8210c1a082..71a9088d4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -247,27 +247,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
     @Override
     public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute,
             PlanTranslatorContext context) {
-        PlanFragment childFragment = distribute.child().accept(this, context);
+        PlanFragment inputFragment = distribute.child().accept(this, context);
         // TODO: why need set streaming here? should remove this.
-        if (childFragment.getPlanRoot() instanceof AggregationNode
+        if (inputFragment.getPlanRoot() instanceof AggregationNode
                 && distribute.child() instanceof PhysicalHashAggregate
-                && context.getFirstAggregateInFragment(childFragment) == distribute.child()) {
+                && context.getFirstAggregateInFragment(inputFragment) == distribute.child()) {
             PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) distribute.child();
             if (hashAggregate.getAggPhase() == AggPhase.LOCAL
                     && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) {
-                AggregationNode aggregationNode = (AggregationNode) childFragment.getPlanRoot();
+                AggregationNode aggregationNode = (AggregationNode) inputFragment.getPlanRoot();
                 aggregationNode.setUseStreamingPreagg(hashAggregate.isMaybeUsingStream());
             }
         }
 
-        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), childFragment.getPlanRoot());
+        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), inputFragment.getPlanRoot());
         updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute);
-        exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances());
-        if (distribute.getDistributionSpec() instanceof DistributionSpecGather) {
-            // gather to one instance
-            exchangeNode.setNumInstances(1);
-        }
-
         List<ExprId> validOutputIds = distribute.getOutputExprIds();
         if (distribute.child() instanceof PhysicalHashAggregate) {
             // we must add group by keys to output list,
@@ -282,8 +276,27 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         }
         DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
         PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
-        childFragment.setDestination(exchangeNode);
-        childFragment.setOutputPartition(dataPartition);
+        exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
+        if (distribute.getDistributionSpec() instanceof DistributionSpecGather) {
+            // gather to one instance
+            exchangeNode.setNumInstances(1);
+        }
+
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
+            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context);
+            exchangeNode.updateTupleIds(tupleDescriptor);
+            dataStreamSink.setExchNodeId(exchangeNode.getId());
+            dataStreamSink.setOutputPartition(dataPartition);
+            parentFragment.addChild(inputFragment);
+        } else {
+            inputFragment.setDestination(exchangeNode);
+            inputFragment.setOutputPartition(dataPartition);
+        }
+
         context.addPlanFragment(parentFragment);
         return parentFragment;
     }
@@ -763,68 +776,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         PhysicalCTEProducer cteProducer = context.getCteProduceMap().get(cteId);
         Preconditions.checkState(cteProducer != null, "invalid cteProducer");
 
-        ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), multiCastFragment.getPlanRoot());
-
-        DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId());
-        streamSink.setPartition(DataPartition.RANDOM);
+        // set datasink to multicast data sink but do not set target now
+        // target will be set when translate distribute
+        DataStreamSink streamSink = new DataStreamSink();
         streamSink.setFragment(multiCastFragment);
-
         multiCastDataSink.getDataStreamSinks().add(streamSink);
         multiCastDataSink.getDestinations().add(Lists.newArrayList());
 
-        exchangeNode.setNumInstances(multiCastFragment.getPlanRoot().getNumInstances());
-
-        PlanFragment consumeFragment = new PlanFragment(context.nextFragmentId(), exchangeNode,
-                multiCastFragment.getDataPartition());
-
-        Map<Slot, Slot> projectMap = Maps.newHashMap();
-        projectMap.putAll(cteConsumer.getProducerToConsumerSlotMap());
-
-        List<NamedExpression> execList = new ArrayList<>();
-        PlanNode inputPlanNode = consumeFragment.getPlanRoot();
-        List<Slot> cteProjects = cteProducer.getProjects();
-        for (Slot slot : cteProjects) {
-            if (projectMap.containsKey(slot)) {
-                execList.add(projectMap.get(slot));
-            } else {
-                throw new RuntimeException("could not find slot in cte producer consumer projectMap");
-            }
+        // update expr to slot mapping
+        for (int i = 0; i < cteConsumer.getOutput().size(); i++) {
+            Slot producerSlot = cteProducer.getOutput().get(i);
+            Slot consumerSlot = cteConsumer.getOutput().get(i);
+            SlotRef slotRef = context.findSlotRef(producerSlot.getExprId());
+            context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
         }
-
-        List<Slot> slotList = execList
-                .stream()
-                .map(NamedExpression::toSlot)
-                .collect(Collectors.toList());
-
-        TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
-
-        // update tuple list and tblTupleList
-        inputPlanNode.getTupleIds().clear();
-        inputPlanNode.getTupleIds().add(tupleDescriptor.getId());
-        inputPlanNode.getTblRefIds().clear();
-        inputPlanNode.getTblRefIds().add(tupleDescriptor.getId());
-        inputPlanNode.getNullableTupleIds().clear();
-        inputPlanNode.getNullableTupleIds().add(tupleDescriptor.getId());
-
-        List<Expr> execExprList = execList
-                .stream()
-                .map(e -> ExpressionTranslator.translate(e, context))
-                .collect(Collectors.toList());
-
-        inputPlanNode.setProjectList(execExprList);
-        inputPlanNode.setOutputTupleDesc(tupleDescriptor);
-
-        // update data partition
-        consumeFragment.setDataPartition(DataPartition.RANDOM);
-
-        SelectNode projectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode);
-        consumeFragment.setPlanRoot(projectNode);
-
-        multiCastFragment.getDestNodeList().add(exchangeNode);
-        consumeFragment.addChild(multiCastFragment);
-        context.getPlanFragments().add(consumeFragment);
-
-        return consumeFragment;
+        return multiCastFragment;
     }
 
     @Override
@@ -859,6 +825,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
         }
         PlanFragment inputFragment = filter.child(0).accept(this, context);
 
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
+            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            filter.getConjuncts().stream()
+                    .map(e -> ExpressionTranslator.translate(e, context))
+                    .forEach(dataStreamSink::addConjunct);
+            return inputFragment;
+        }
+
         PlanNode planNode = inputFragment.getPlanRoot();
         if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) {
             // the three nodes don't support conjuncts, need create a SelectNode to filter data
@@ -1397,19 +1374,31 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
                 ((AbstractPhysicalJoin<?, ?>) project.child(0).child(0)).setShouldTranslateOutput(false);
             }
         }
+
         PlanFragment inputFragment = project.child(0).accept(this, context);
+
         List<Expr> execExprList = project.getProjects()
                 .stream()
                 .map(e -> ExpressionTranslator.translate(e, context))
                 .collect(Collectors.toList());
         // TODO: fix the project alias of an aliased relation.
-
-        PlanNode inputPlanNode = inputFragment.getPlanRoot();
         List<Slot> slotList = project.getProjects()
                 .stream()
                 .map(NamedExpression::toSlot)
                 .collect(Collectors.toList());
 
+        // process multicast sink
+        if (inputFragment instanceof MultiCastPlanFragment) {
+            MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
+            DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
+                    multiCastDataSink.getDataStreamSinks().size() - 1);
+            TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
+            dataStreamSink.setProjections(execExprList);
+            dataStreamSink.setOutputTupleDesc(tupleDescriptor);
+            return inputFragment;
+        }
+
+        PlanNode inputPlanNode = inputFragment.getPlanRoot();
         List<Expr> predicateList = inputPlanNode.getConjuncts();
         Set<SlotId> requiredSlotIdSet = Sets.newHashSet();
         for (Expr expr : execExprList) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 2695b67a93..158eaa67f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -114,7 +114,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
     public PhysicalProperties visitPhysicalCTEConsumer(
             PhysicalCTEConsumer cteConsumer, PlanContext context) {
         Preconditions.checkState(childrenOutputProperties.size() == 0);
-        return PhysicalProperties.ANY;
+        return PhysicalProperties.MUST_SHUFFLE;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 862dbb5e2b..86a3f650d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -27,9 +27,11 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.plans.AggMode;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -74,16 +76,33 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
 
     @Override
     public Boolean visit(Plan plan, Void context) {
+        // process must shuffle
+        for (int i = 0; i < children.size(); i++) {
+            DistributionSpec distributionSpec = childrenProperties.get(i).getDistributionSpec();
+            if (distributionSpec instanceof DistributionSpecMustShuffle) {
+                updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY);
+            }
+        }
         return true;
     }
 
     @Override
     public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> agg, Void context) {
+        // forbid one phase agg on distribute
         if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
                 && children.get(0).getPlan() instanceof PhysicalDistribute) {
             // this means one stage gather agg, usually bad pattern
             return false;
         }
+        // process must shuffle
+        visit(agg, context);
+        // process agg
+        return true;
+    }
+
+    @Override
+    public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void context) {
+        // do not process must shuffle
         return true;
     }
 
@@ -93,6 +112,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
         Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size()));
         Preconditions.checkArgument(childrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
+        // process must shuffle
+        visit(hashJoin, context);
+        // process hash join
         DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec();
         DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec();
 
@@ -229,6 +251,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
         Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size()));
         Preconditions.checkArgument(childrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
+        // process must shuffle
+        visit(nestedLoopJoin, context);
+        // process nlj
         DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec();
         if (rightDistributionSpec instanceof DistributionSpecStorageGather) {
             updateChildEnforceAndCost(1, PhysicalProperties.GATHER);
@@ -236,8 +261,17 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
         return true;
     }
 
+    @Override
+    public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> project, Void context) {
+        // do not process must shuffle
+        return true;
+    }
+
     @Override
     public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) {
+        // process must shuffle
+        visit(setOperation, context);
+        // process set operation
         if (children.isEmpty()) {
             return true;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java
new file mode 100644
index 0000000000..8f718fbb9d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+/**
+ * present data must use after shuffle
+ */
+public class DistributionSpecMustShuffle extends DistributionSpec {
+
+    public static final DistributionSpecMustShuffle INSTANCE = new DistributionSpecMustShuffle();
+
+    public DistributionSpecMustShuffle() {
+        super();
+    }
+
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        return other instanceof DistributionSpecAny;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 28bf347977..9596043f67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -44,6 +44,9 @@ public class PhysicalProperties {
 
     public static PhysicalProperties STORAGE_GATHER = new PhysicalProperties(DistributionSpecStorageGather.INSTANCE);
 
+    public static PhysicalProperties MUST_SHUFFLE = new PhysicalProperties(DistributionSpecMustShuffle.INSTANCE);
+
+
     private final OrderSpec orderSpec;
 
     private final DistributionSpec distributionSpec;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index 0f903d69d1..274cdd86f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -20,19 +20,38 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.BitmapFilterPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDataStreamSink;
 import org.apache.doris.thrift.TExplainLevel;
 
+import com.google.common.collect.Lists;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
 /**
  * Data sink that forwards data to an exchange node.
  */
 public class DataStreamSink extends DataSink {
-    private final PlanNodeId exchNodeId;
+
+    private PlanNodeId exchNodeId;
 
     private DataPartition outputPartition;
 
+    protected TupleDescriptor outputTupleDesc;
+
+    protected List<Expr> projections;
+
+    protected List<Expr> conjuncts = Lists.newArrayList();
+
+    public DataStreamSink() {
+
+    }
+
     public DataStreamSink(PlanNodeId exchNodeId) {
         this.exchNodeId = exchNodeId;
     }
@@ -42,23 +61,65 @@ public class DataStreamSink extends DataSink {
         return exchNodeId;
     }
 
+    public void setExchNodeId(PlanNodeId exchNodeId) {
+        this.exchNodeId = exchNodeId;
+    }
+
     @Override
     public DataPartition getOutputPartition() {
         return outputPartition;
     }
 
-    public void setPartition(DataPartition partition) {
-        outputPartition = partition;
+    public void setOutputPartition(DataPartition outputPartition) {
+        this.outputPartition = outputPartition;
+    }
+
+    public TupleDescriptor getOutputTupleDesc() {
+        return outputTupleDesc;
+    }
+
+    public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) {
+        this.outputTupleDesc = outputTupleDesc;
+    }
+
+    public List<Expr> getProjections() {
+        return projections;
+    }
+
+    public void setProjections(List<Expr> projections) {
+        this.projections = projections;
+    }
+
+    public List<Expr> getConjuncts() {
+        return conjuncts;
+    }
+
+    public void setConjuncts(List<Expr> conjuncts) {
+        this.conjuncts = conjuncts;
+    }
+
+    public void addConjunct(Expr conjunct) {
+        this.conjuncts.add(conjunct);
     }
 
     @Override
     public String getExplainString(String prefix, TExplainLevel explainLevel) {
         StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append(prefix + "STREAM DATA SINK\n");
-        strBuilder.append(prefix + "  EXCHANGE ID: " + exchNodeId + "\n");
+        strBuilder.append(prefix).append("STREAM DATA SINK\n");
+        strBuilder.append(prefix).append("  EXCHANGE ID: ").append(exchNodeId);
         if (outputPartition != null) {
-            strBuilder.append(prefix + "  " + outputPartition.getExplainString(explainLevel));
+            strBuilder.append("\n").append(prefix).append("  ").append(outputPartition.getExplainString(explainLevel));
+        }
+        if (!conjuncts.isEmpty()) {
+            Expr expr = PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts);
+            strBuilder.append(prefix).append("  CONJUNCTS: ").append(expr.toSql());
+        }
+        if (!CollectionUtils.isEmpty(projections)) {
+            strBuilder.append("\n").append(prefix).append("  PROJECTIONS: ")
+                    .append(PlanNode.getExplainString(projections)).append("\n");
+            strBuilder.append(prefix).append("  PROJECTION TUPLE: ").append(outputTupleDesc.getId());
         }
+        strBuilder.append("\n");
         return strBuilder.toString();
     }
 
@@ -67,6 +128,19 @@ public class DataStreamSink extends DataSink {
         TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK);
         TDataStreamSink tStreamSink =
                 new TDataStreamSink(exchNodeId.asInt(), outputPartition.toThrift());
+        for (Expr e : conjuncts) {
+            if  (!(e instanceof BitmapFilterPredicate)) {
+                tStreamSink.addToConjuncts(e.treeToThrift());
+            }
+        }
+        if (projections != null) {
+            for (Expr expr : projections) {
+                tStreamSink.addToOutputExprs(expr.treeToThrift());
+            }
+        }
+        if (outputTupleDesc != null) {
+            tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt());
+        }
         result.setStreamSink(tStreamSink);
         return result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 30bf9eb45d..6694a05219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -108,15 +108,21 @@ public class ExchangeNode extends PlanNode {
     public final void computeTupleIds() {
         PlanNode inputNode = getChild(0);
         TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
+        updateTupleIds(outputTupleDesc);
+    }
+
+    public void updateTupleIds(TupleDescriptor outputTupleDesc) {
         if (outputTupleDesc != null) {
             tupleIds.clear();
             tupleIds.add(outputTupleDesc.getId());
+            tblRefIds.add(outputTupleDesc.getId());
+            nullableTupleIds.add(outputTupleDesc.getId());
         } else {
             clearTupleIds();
             tupleIds.addAll(getChild(0).getTupleIds());
+            tblRefIds.addAll(getChild(0).getTblRefIds());
+            nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
         }
-        tblRefIds.addAll(getChild(0).getTblRefIds());
-        nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index 54903beae5..64ac4c3051 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -256,7 +256,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
             Preconditions.checkState(sink == null);
             // we're streaming to an exchange node
             DataStreamSink streamSink = new DataStreamSink(destNode.getId());
-            streamSink.setPartition(outputPartition);
+            streamSink.setOutputPartition(outputPartition);
             streamSink.setFragment(this);
             sink = streamSink;
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index ab17c0acec..cf5d14cb5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -415,7 +415,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         }
     }
 
-    protected Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) {
+    public static Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) {
         List<Expr> targetConjuncts = Lists.newArrayList(conjuncts);
         while (targetConjuncts.size() > 1) {
             List<Expr> newTargetConjuncts = Lists.newArrayList();
@@ -824,7 +824,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         return output.toString();
     }
 
-    protected String getExplainString(List<? extends Expr> exprs) {
+    public static String getExplainString(List<? extends Expr> exprs) {
         if (exprs == null) {
             return "";
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org