You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/28 07:31:58 UTC

[iotdb] branch master updated: [IOTDB-5593] Improve efficiency of DistributionPlanner by recording map instead of recursive search

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fd995957f [IOTDB-5593] Improve efficiency of DistributionPlanner by recording map instead of recursive search
4fd995957f is described below

commit 4fd995957fabb030e35d7caf115a99f2f4edb260
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Tue Feb 28 15:31:52 2023 +0800

    [IOTDB-5593] Improve efficiency of DistributionPlanner by recording map instead of recursive search
---
 .../mpp/execution/exchange/sink/SinkChannel.java   |  6 +-
 .../execution/exchange/source/SourceHandle.java    |  7 ++-
 .../planner/distribution/DistributionPlanner.java  | 73 +++++++++-------------
 .../planner/distribution/ExchangeNodeAdder.java    | 15 +++--
 .../planner/distribution/NodeGroupContext.java     |  7 +--
 .../SimpleFragmentParallelPlanner.java             | 16 ++---
 .../db/mpp/plan/planner/plan/PlanFragment.java     | 18 ------
 .../plan/node/sink/MultiChildrenSinkNode.java      |  4 ++
 8 files changed, 63 insertions(+), 83 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 9ad82768c6..fbcdaf2e0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -419,7 +419,11 @@ public class SinkChannel implements ISinkChannel {
     public void run() {
       try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
         LOGGER.debug(
-            "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size());
+            "[NotifyNewTsBlock] [{}, {}) to {}.{}",
+            startSequenceId,
+            startSequenceId + blockSizes.size(),
+            remoteFragmentInstanceId,
+            remotePlanNodeId);
         int attempt = 0;
         TNewDataBlockEvent newDataBlockEvent =
             new TNewDataBlockEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 10821ca1ac..db462721d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -470,7 +470,12 @@ public class SourceHandle implements ISourceHandle {
     @Override
     public void run() {
       try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-        LOGGER.debug("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId);
+        LOGGER.debug(
+            "[StartPullTsBlocksFromRemote] {}-{} [{}, {}) ",
+            remoteFragmentInstanceId,
+            indexOfUpstreamSinkHandle,
+            startSequenceId,
+            endSequenceId);
         TGetDataBlockRequest req =
             new TGetDataBlockRequest(
                 remoteFragmentInstanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 73fe5c5259..6b58ae1fe7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import org.apache.commons.lang3.Validate;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -89,7 +88,7 @@ public class DistributionPlanner {
                 && (((QueryStatement) analysis.getStatement()).isAlignByDevice()),
             root);
     PlanNode newRoot = adder.visit(root, nodeGroupContext);
-    adjustUpStream(nodeGroupContext);
+    adjustUpStream(newRoot, nodeGroupContext);
     return newRoot;
   }
 
@@ -99,8 +98,8 @@ public class DistributionPlanner {
    * org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode} for the children of
    * ExchangeNodes with Same DataRegion.
    */
-  private void adjustUpStream(NodeGroupContext context) {
-    if (context.exchangeNodes.isEmpty()) {
+  private void adjustUpStream(PlanNode root, NodeGroupContext context) {
+    if (!context.hasExchangeNode) {
       return;
     }
 
@@ -108,44 +107,34 @@ public class DistributionPlanner {
         analysis.getStatement() instanceof QueryStatement
             && needShuffleSinkNode((QueryStatement) analysis.getStatement(), context);
 
-    // step1: group children of ExchangeNodes
-    Map<TRegionReplicaSet, List<PlanNode>> nodeGroups = new HashMap<>();
-    context.exchangeNodes.forEach(
-        exchangeNode ->
-            nodeGroups
-                .computeIfAbsent(
-                    context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region,
-                    exchangeNodes -> new ArrayList<>())
-                .add(exchangeNode.getChild()));
-
-    // step2: add IdentitySinkNode/ShuffleSinkNode as parent for nodes of each group
-    nodeGroups
-        .values()
-        .forEach(
-            planNodeList -> {
-              MultiChildrenSinkNode parent =
-                  needShuffleSinkNode
-                      ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
-                      : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
-              parent.addChildren(planNodeList);
-              // we put the parent in list to get it quickly by dataRegion of one ExchangeNode
-              planNodeList.add(parent);
-            });
-
-    // step3: add child for each ExchangeNode,
-    // the child is IdentitySinkNode/ShuffleSinkNode we generated in the last step
+    adjustUpStreamHelper(root, new HashMap<>(), needShuffleSinkNode, context);
+  }
 
-    // count the visited time of each SinkNode
-    Map<TRegionReplicaSet, Integer> visitedCount = new HashMap<>();
-    context.exchangeNodes.forEach(
-        exchangeNode -> {
-          TRegionReplicaSet regionOfChild =
-              context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
-          visitedCount.compute(regionOfChild, (region, count) -> (count == null) ? 0 : count + 1);
-          List<PlanNode> planNodeList = nodeGroups.get(regionOfChild);
-          exchangeNode.setChild(planNodeList.get(planNodeList.size() - 1));
-          exchangeNode.setIndexOfUpstreamSinkHandle(visitedCount.get(regionOfChild));
-        });
+  private void adjustUpStreamHelper(
+      PlanNode root,
+      Map<TRegionReplicaSet, MultiChildrenSinkNode> memo,
+      boolean needShuffleSinkNode,
+      NodeGroupContext context) {
+    for (PlanNode child : root.getChildren()) {
+      adjustUpStreamHelper(child, memo, needShuffleSinkNode, context);
+      if (child instanceof ExchangeNode) {
+        ExchangeNode exchangeNode = (ExchangeNode) child;
+        TRegionReplicaSet regionOfChild =
+            context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
+        MultiChildrenSinkNode newChild =
+            memo.computeIfAbsent(
+                regionOfChild,
+                tRegionReplicaSet ->
+                    needShuffleSinkNode
+                        ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+                        : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId()));
+        newChild.addChild(exchangeNode.getChild());
+        newChild.addDownStreamChannelLocation(
+            new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
+        exchangeNode.setChild(newChild);
+        exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+      }
+    }
   }
 
   /** Return true if we need to use ShuffleSinkNode instead of IdentitySinkNode. */
@@ -265,8 +254,6 @@ public class DistributionPlanner {
             exchangeNode.getChild() instanceof MultiChildrenSinkNode,
             "child of ExchangeNode must be MultiChildrenSinkNode");
         MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) (exchangeNode.getChild());
-        sinkNode.addDownStreamChannelLocation(
-            new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
 
         // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
         exchangeNode.cleanChildren();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 14a5ec1c00..56dfda1cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -124,7 +124,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
                     new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
                 exchangeNode.setChild(child);
                 exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-                context.exchangeNodes.add(exchangeNode);
+                context.hasExchangeNode = true;
                 newNode.addChild(exchangeNode);
               } else {
                 newNode.addChild(child);
@@ -288,13 +288,18 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
       // For align by device,
       // if dataRegions of children are the same, we set child's dataRegion to this node,
       // else we set the selected mostlyUsedDataRegion to this node
+      boolean inSame = nodeDistributionIsSame(visitedChildren, context);
       dataRegion =
-          nodeDistributionIsSame(visitedChildren, context)
+          inSame
               ? context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
               : context.getMostlyUsedDataRegion();
       context.putNodeDistribution(
           newNode.getPlanNodeId(),
-          new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, dataRegion));
+          new NodeDistribution(
+              inSame
+                  ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+                  : NodeDistributionType.SAME_WITH_SOME_CHILD,
+              dataRegion));
     } else {
       // TODO For align by time, we keep old logic for now
       dataRegion = calculateDataRegionByChildren(visitedChildren, context);
@@ -324,7 +329,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
                 new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
             exchangeNode.setChild(child);
             exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-            context.exchangeNodes.add(exchangeNode);
+            context.hasExchangeNode = true;
             newNode.addChild(exchangeNode);
           } else {
             newNode.addChild(child);
@@ -345,7 +350,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
           new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
       exchangeNode.setChild(child);
       exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-      context.exchangeNodes.add(exchangeNode);
+      context.hasExchangeNode = true;
       newNode.addChild(exchangeNode);
     }
     return newNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
index 389e30f0eb..6dda89e41f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
@@ -24,13 +24,10 @@ import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -39,14 +36,14 @@ public class NodeGroupContext {
   private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
   private final boolean isAlignByDevice;
   private final TRegionReplicaSet mostlyUsedDataRegion;
-  protected final List<ExchangeNode> exchangeNodes;
+  protected boolean hasExchangeNode;
 
   public NodeGroupContext(MPPQueryContext queryContext, boolean isAlignByDevice, PlanNode root) {
     this.queryContext = queryContext;
     this.nodeDistributionMap = new HashMap<>();
     this.isAlignByDevice = isAlignByDevice;
     this.mostlyUsedDataRegion = isAlignByDevice ? getMostlyUsedDataRegion(root) : null;
-    this.exchangeNodes = new ArrayList<>();
+    this.hasExchangeNode = false;
   }
 
   private TRegionReplicaSet getMostlyUsedDataRegion(PlanNode root) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index d86c96d452..6361a91f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -34,12 +34,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.MultiChildrenSinkNode;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +64,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   // Record all the FragmentInstances belonged to same PlanFragment
   Map<PlanFragmentId, FragmentInstance> instanceMap;
   // Record which PlanFragment the PlanNode belongs
-  Map<PlanNodeId, PlanFragmentId> planNodeMap;
+  Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
   List<FragmentInstance> fragmentInstanceList;
 
   public SimpleFragmentParallelPlanner(
@@ -93,13 +93,10 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   }
 
   private void produceFragmentInstance(PlanFragment fragment) {
-    // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
-    // one by one
-    PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getPlanNodeTree());
     Filter timeFilter = analysis.getGlobalTimeFilter();
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            new PlanFragment(fragment.getId(), rootCopy),
+            fragment,
             fragment.getId().genFragmentInstanceId(),
             timeFilter,
             queryContext.getQueryType(),
@@ -217,8 +214,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
                       downStreamInstance.getId().toThrift());
 
                   // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
-                  PlanNode downStreamExchangeNode =
-                      downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+                  PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right;
                   ((ExchangeNode) downStreamExchangeNode)
                       .setUpstream(
                           instance.getHostDataNode().getMPPDataExchangeEndPoint(),
@@ -230,11 +226,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   }
 
   private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
-    return instanceMap.get(planNodeMap.get(exchangeNodeId));
+    return instanceMap.get(planNodeMap.get(exchangeNodeId).left);
   }
 
   private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
-    planNodeMap.put(root.getPlanNodeId(), planFragmentId);
+    planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
     for (PlanNode child : root.getChildren()) {
       recordPlanNodeRelation(child, planFragmentId);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index aac15e7940..5fd8b79a7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.SubPlanTypeExtractor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.VirtualSourceNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -133,23 +132,6 @@ public class PlanFragment {
     return null;
   }
 
-  public PlanNode getPlanNodeById(PlanNodeId nodeId) {
-    return getPlanNodeById(planNodeTree, nodeId);
-  }
-
-  private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
-    if (root.getPlanNodeId().equals(nodeId)) {
-      return root;
-    }
-    for (PlanNode child : root.getChildren()) {
-      PlanNode node = getPlanNodeById(child, nodeId);
-      if (node != null) {
-        return node;
-      }
-    }
-    return null;
-  }
-
   public void serialize(ByteBuffer byteBuffer) {
     id.serialize(byteBuffer);
     planNodeTree.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
index 686dae93e9..1e1ad0f428 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
@@ -114,4 +114,8 @@ public abstract class MultiChildrenSinkNode extends SinkNode {
   public void addDownStreamChannelLocation(DownStreamChannelLocation downStreamChannelLocation) {
     downStreamChannelLocationList.add(downStreamChannelLocation);
   }
+
+  public int getCurrentLastIndex() {
+    return children.size() - 1;
+  }
 }