You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/28 03:53:22 UTC

[iotdb] 01/02: try to fix

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch geely_car_0206
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 503fe57b5f6996c34bf5e4842445d4ad75546a10
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Feb 28 09:54:49 2023 +0800

    try to fix
---
 .../execution/exchange/MPPDataExchangeManager.java | 18 +++--
 .../mpp/execution/exchange/sink/SinkChannel.java   | 41 ++++++++++-
 .../execution/exchange/source/SourceHandle.java    |  7 +-
 .../planner/distribution/DistributionPlanner.java  | 81 ++++++++++------------
 .../planner/distribution/ExchangeNodeAdder.java    | 15 ++--
 .../planner/distribution/NodeGroupContext.java     |  7 +-
 .../plan/node/sink/MultiChildrenSinkNode.java      |  4 ++
 7 files changed, 113 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 39cf36a5c0..926e622b1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -60,6 +60,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -505,7 +506,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       String localPlanNodeId,
       // TODO: replace with callbacks to decouple MPPDataExchangeManager from
       // FragmentInstanceContext
-      FragmentInstanceContext instanceContext) {
+      FragmentInstanceContext instanceContext,
+      int index) {
 
     LOGGER.debug(
         "Create sink handle to plan node {} of {} for {}",
@@ -523,7 +525,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         executorService,
         tsBlockSerdeFactory.get(),
         new SinkListenerImpl(instanceContext, instanceContext::failed),
-        mppDataExchangeServiceClientManager);
+        mppDataExchangeServiceClientManager,
+        index);
   }
 
   @Override
@@ -541,6 +544,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           "ShuffleSinkHandle for " + localFragmentInstanceId + " exists.");
     }
 
+    AtomicInteger index = new AtomicInteger(0);
+
     List<ISinkChannel> downStreamChannelList =
         downStreamChannelLocationList.stream()
             .map(
@@ -549,7 +554,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
                         localFragmentInstanceId,
                         localPlanNodeId,
                         downStreamChannelLocation,
-                        instanceContext))
+                        instanceContext,
+                        index.getAndIncrement()))
             .collect(Collectors.toList());
 
     ShuffleSinkHandle shuffleSinkHandle =
@@ -568,7 +574,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       DownStreamChannelLocation downStreamChannelLocation,
-      FragmentInstanceContext instanceContext) {
+      FragmentInstanceContext instanceContext,
+      int index) {
     if (isSameNode(downStreamChannelLocation.getRemoteEndpoint())) {
       return createLocalSinkChannel(
           localFragmentInstanceId,
@@ -583,7 +590,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           downStreamChannelLocation.getRemoteFragmentInstanceId(),
           downStreamChannelLocation.getRemotePlanNodeId(),
           localPlanNodeId,
-          instanceContext);
+          instanceContext,
+          index);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 9ad82768c6..36836e2f81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -108,6 +108,41 @@ public class SinkChannel implements ISinkChannel {
 
   private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
+  public SinkChannel(
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remotePlanNodeId,
+      String localPlanNodeId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      TsBlockSerde serde,
+      SinkListener sinkListener,
+      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+          mppDataExchangeServiceClientManager,
+      int index) {
+    this.remoteEndpoint = Validate.notNull(remoteEndpoint);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
+    this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.serde = Validate.notNull(serde);
+    this.sinkListener = Validate.notNull(sinkListener);
+    this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
+    this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
+    this.threadName =
+        createFullId(
+            localFragmentInstanceId.queryId,
+            localFragmentInstanceId.fragmentId,
+            localFragmentInstanceId.instanceId + "-" + index);
+    this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
   public SinkChannel(
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -419,7 +454,11 @@ public class SinkChannel implements ISinkChannel {
     public void run() {
       try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
         LOGGER.debug(
-            "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size());
+            "[NotifyNewTsBlock] [{}, {}) to {}.{}",
+            startSequenceId,
+            startSequenceId + blockSizes.size(),
+            remoteFragmentInstanceId,
+            remotePlanNodeId);
         int attempt = 0;
         TNewDataBlockEvent newDataBlockEvent =
             new TNewDataBlockEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 10821ca1ac..db462721d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -470,7 +470,12 @@ public class SourceHandle implements ISourceHandle {
     @Override
     public void run() {
       try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-        LOGGER.debug("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId);
+        LOGGER.debug(
+            "[StartPullTsBlocksFromRemote] {}-{} [{}, {}) ",
+            remoteFragmentInstanceId,
+            indexOfUpstreamSinkHandle,
+            startSequenceId,
+            endSequenceId);
         TGetDataBlockRequest req =
             new TGetDataBlockRequest(
                 remoteFragmentInstanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 73fe5c5259..4733371321 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -44,8 +44,9 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,6 +55,9 @@ import java.util.Map;
 import java.util.Set;
 
 public class DistributionPlanner {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DistributionPlanner.class);
+
   private Analysis analysis;
   private MPPQueryContext context;
   private LogicalQueryPlan logicalPlan;
@@ -89,7 +93,7 @@ public class DistributionPlanner {
                 && (((QueryStatement) analysis.getStatement()).isAlignByDevice()),
             root);
     PlanNode newRoot = adder.visit(root, nodeGroupContext);
-    adjustUpStream(nodeGroupContext);
+    adjustUpStream(root, nodeGroupContext);
     return newRoot;
   }
 
@@ -99,8 +103,8 @@ public class DistributionPlanner {
    * org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode} for the children of
    * ExchangeNodes with Same DataRegion.
    */
-  private void adjustUpStream(NodeGroupContext context) {
-    if (context.exchangeNodes.isEmpty()) {
+  private void adjustUpStream(PlanNode root, NodeGroupContext context) {
+    if (!context.hasExchangeNode) {
       return;
     }
 
@@ -108,44 +112,37 @@ public class DistributionPlanner {
         analysis.getStatement() instanceof QueryStatement
             && needShuffleSinkNode((QueryStatement) analysis.getStatement(), context);
 
-    // step1: group children of ExchangeNodes
-    Map<TRegionReplicaSet, List<PlanNode>> nodeGroups = new HashMap<>();
-    context.exchangeNodes.forEach(
-        exchangeNode ->
-            nodeGroups
-                .computeIfAbsent(
-                    context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region,
-                    exchangeNodes -> new ArrayList<>())
-                .add(exchangeNode.getChild()));
-
-    // step2: add IdentitySinkNode/ShuffleSinkNode as parent for nodes of each group
-    nodeGroups
-        .values()
-        .forEach(
-            planNodeList -> {
-              MultiChildrenSinkNode parent =
-                  needShuffleSinkNode
-                      ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
-                      : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
-              parent.addChildren(planNodeList);
-              // we put the parent in list to get it quickly by dataRegion of one ExchangeNode
-              planNodeList.add(parent);
-            });
-
-    // step3: add child for each ExchangeNode,
-    // the child is IdentitySinkNode/ShuffleSinkNode we generated in the last step
+    adjustUpStreamHelper(root, new HashMap<>(), needShuffleSinkNode, context);
+  }
 
-    // count the visited time of each SinkNode
-    Map<TRegionReplicaSet, Integer> visitedCount = new HashMap<>();
-    context.exchangeNodes.forEach(
-        exchangeNode -> {
-          TRegionReplicaSet regionOfChild =
-              context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
-          visitedCount.compute(regionOfChild, (region, count) -> (count == null) ? 0 : count + 1);
-          List<PlanNode> planNodeList = nodeGroups.get(regionOfChild);
-          exchangeNode.setChild(planNodeList.get(planNodeList.size() - 1));
-          exchangeNode.setIndexOfUpstreamSinkHandle(visitedCount.get(regionOfChild));
-        });
+  private void adjustUpStreamHelper(
+      PlanNode root,
+      Map<TRegionReplicaSet, MultiChildrenSinkNode> memo,
+      boolean needShuffleSinkNode,
+      NodeGroupContext context) {
+    for (PlanNode child : root.getChildren()) {
+      adjustUpStreamHelper(child, memo, needShuffleSinkNode, context);
+      if (child instanceof ExchangeNode) {
+        ExchangeNode exchangeNode = (ExchangeNode) child;
+        TRegionReplicaSet regionOfChild =
+            context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
+        MultiChildrenSinkNode newChild =
+            memo.computeIfAbsent(
+                regionOfChild,
+                tRegionReplicaSet -> {
+                  MultiChildrenSinkNode result =
+                      needShuffleSinkNode
+                          ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+                          : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
+                  result.addChild(exchangeNode.getChild());
+                  result.addDownStreamChannelLocation(
+                      new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
+                  return result;
+                });
+        exchangeNode.setChild(newChild);
+        exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+      }
+    }
   }
 
   /** Return true if we need to use ShuffleSinkNode instead of IdentitySinkNode. */
@@ -265,8 +262,6 @@ public class DistributionPlanner {
             exchangeNode.getChild() instanceof MultiChildrenSinkNode,
             "child of ExchangeNode must be MultiChildrenSinkNode");
         MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) (exchangeNode.getChild());
-        sinkNode.addDownStreamChannelLocation(
-            new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
 
         // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
         exchangeNode.cleanChildren();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 14a5ec1c00..56dfda1cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -124,7 +124,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
                     new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
                 exchangeNode.setChild(child);
                 exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-                context.exchangeNodes.add(exchangeNode);
+                context.hasExchangeNode = true;
                 newNode.addChild(exchangeNode);
               } else {
                 newNode.addChild(child);
@@ -288,13 +288,18 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
       // For align by device,
       // if dataRegions of children are the same, we set child's dataRegion to this node,
       // else we set the selected mostlyUsedDataRegion to this node
+      boolean inSame = nodeDistributionIsSame(visitedChildren, context);
       dataRegion =
-          nodeDistributionIsSame(visitedChildren, context)
+          inSame
               ? context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
               : context.getMostlyUsedDataRegion();
       context.putNodeDistribution(
           newNode.getPlanNodeId(),
-          new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, dataRegion));
+          new NodeDistribution(
+              inSame
+                  ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+                  : NodeDistributionType.SAME_WITH_SOME_CHILD,
+              dataRegion));
     } else {
       // TODO For align by time, we keep old logic for now
       dataRegion = calculateDataRegionByChildren(visitedChildren, context);
@@ -324,7 +329,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
                 new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
             exchangeNode.setChild(child);
             exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-            context.exchangeNodes.add(exchangeNode);
+            context.hasExchangeNode = true;
             newNode.addChild(exchangeNode);
           } else {
             newNode.addChild(child);
@@ -345,7 +350,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
           new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
       exchangeNode.setChild(child);
       exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-      context.exchangeNodes.add(exchangeNode);
+      context.hasExchangeNode = true;
       newNode.addChild(exchangeNode);
     }
     return newNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
index 389e30f0eb..6dda89e41f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
@@ -24,13 +24,10 @@ import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -39,14 +36,14 @@ public class NodeGroupContext {
   private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
   private final boolean isAlignByDevice;
   private final TRegionReplicaSet mostlyUsedDataRegion;
-  protected final List<ExchangeNode> exchangeNodes;
+  protected boolean hasExchangeNode;
 
   public NodeGroupContext(MPPQueryContext queryContext, boolean isAlignByDevice, PlanNode root) {
     this.queryContext = queryContext;
     this.nodeDistributionMap = new HashMap<>();
     this.isAlignByDevice = isAlignByDevice;
     this.mostlyUsedDataRegion = isAlignByDevice ? getMostlyUsedDataRegion(root) : null;
-    this.exchangeNodes = new ArrayList<>();
+    this.hasExchangeNode = false;
   }
 
   private TRegionReplicaSet getMostlyUsedDataRegion(PlanNode root) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
index 686dae93e9..1e1ad0f428 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
@@ -114,4 +114,8 @@ public abstract class MultiChildrenSinkNode extends SinkNode {
   public void addDownStreamChannelLocation(DownStreamChannelLocation downStreamChannelLocation) {
     downStreamChannelLocationList.add(downStreamChannelLocation);
   }
+
+  public int getCurrentLastIndex() {
+    return children.size() - 1;
+  }
 }