You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/22 10:47:31 UTC

[iotdb] branch master updated: [IOTDB-5117] Fix some issues in MemoryDistributionCalculator (#8580)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 15c0eabcb6 [IOTDB-5117] Fix some issues in MemoryDistributionCalculator (#8580)
15c0eabcb6 is described below

commit 15c0eabcb6733f0f7aa43f3f63805dd9fe9b5991
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Dec 22 18:47:25 2022 +0800

    [IOTDB-5117] Fix some issues in MemoryDistributionCalculator (#8580)
---
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  2 +-
 .../plan/planner/MemoryDistributionCalculator.java | 55 ++++++----------------
 2 files changed, 16 insertions(+), 41 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 5b6d2abc86..36d53f6279 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -126,7 +126,7 @@ public class LocalExecutionPlanner {
   private void setMemoryLimitForHandle(TFragmentInstanceId fragmentInstanceId, PlanNode plan) {
     MemoryDistributionCalculator visitor = new MemoryDistributionCalculator();
     plan.accept(visitor, null);
-    long totalSplit = visitor.calculateTotalSplit();
+    int totalSplit = visitor.calculateTotalSplit();
     if (totalSplit == 0) {
       return;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
index fb117804a7..fe87cb9972 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
@@ -70,28 +70,15 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
 public class MemoryDistributionCalculator
     extends PlanVisitor<Void, MemoryDistributionCalculator.MemoryDistributionContext> {
   /** This map is used to calculate the total split of memory */
-  private final Map<PlanNodeId, List<PlanNodeId>> exchangeMap;
-
-  public MemoryDistributionCalculator() {
-    this.exchangeMap = new HashMap<>();
-  }
+  private int exchangeNum;
 
-  public long calculateTotalSplit() {
-    long res = 0;
-    for (List<PlanNodeId> l : exchangeMap.values()) {
-      res += l.size();
-    }
-    return res;
+  public int calculateTotalSplit() {
+    return exchangeNum;
   }
 
   @Override
@@ -127,29 +114,17 @@ public class MemoryDistributionCalculator
             });
   }
 
+  /** We do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update */
   @Override
   public Void visitExchange(ExchangeNode node, MemoryDistributionContext context) {
-    // we do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update
-    // the map
-    if (context == null) {
-      // context == null means this ExchangeNode has no father
-      exchangeMap
-          .computeIfAbsent(node.getPlanNodeId(), x -> new ArrayList<>())
-          .add(node.getPlanNodeId());
-    } else {
-      if (context.memoryDistributionType.equals(
-          MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
-        exchangeMap
-            .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
-            .add(node.getPlanNodeId());
-      } else if (context.memoryDistributionType.equals(
-              MemoryDistributionType.CONSUME_CHILDREN_ONE_BY_ONE)
-          && !exchangeMap.containsKey(context.planNodeId)) {
-        // All children share one split, thus only one node needs to be put into the map
-        exchangeMap
-            .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
-            .add(node.getPlanNodeId());
-      }
+    // context == null means this ExchangeNode doesn't have a father
+    if (context == null
+        || context.memoryDistributionType.equals(
+            MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
+      exchangeNum++;
+    } else if (!context.exchangeAdded) {
+      context.exchangeAdded = true;
+      exchangeNum++;
     }
     return null;
   }
@@ -159,10 +134,9 @@ public class MemoryDistributionCalculator
     // LocalSinkHandle and LocalSourceHandle are one-to-one mapped and only LocalSourceHandle do the
     // update
     if (!isSameNode(node.getDownStreamEndpoint())) {
-      exchangeMap
-          .computeIfAbsent(node.getDownStreamPlanNodeId(), x -> new ArrayList<>())
-          .add(node.getDownStreamPlanNodeId());
+      exchangeNum++;
     }
+    node.getChild().accept(this, context);
     return null;
   }
 
@@ -479,6 +453,7 @@ public class MemoryDistributionCalculator
 
   static class MemoryDistributionContext {
     final PlanNodeId planNodeId;
+    boolean exchangeAdded = false;
     final MemoryDistributionType memoryDistributionType;
 
     MemoryDistributionContext(