You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/22 08:39:15 UTC

[iotdb] 01/01: Fix memory distribution bug

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch memoryDistribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f7f2ac2affa58e9f9332798d7db9a9867265f249
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Dec 22 16:39:00 2022 +0800

    Fix memory distribution bug
---
 .../plan/planner/MemoryDistributionCalculator.java | 48 +++++++---------------
 1 file changed, 14 insertions(+), 34 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
index fb117804a7..5120e73647 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
@@ -80,18 +80,10 @@ import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode
 public class MemoryDistributionCalculator
     extends PlanVisitor<Void, MemoryDistributionCalculator.MemoryDistributionContext> {
   /** This map is used to calculate the total split of memory */
-  private final Map<PlanNodeId, List<PlanNodeId>> exchangeMap;
-
-  public MemoryDistributionCalculator() {
-    this.exchangeMap = new HashMap<>();
-  }
+  private int exchangeNum;
 
   public long calculateTotalSplit() {
-    long res = 0;
-    for (List<PlanNodeId> l : exchangeMap.values()) {
-      res += l.size();
-    }
-    return res;
+    return exchangeNum;
   }
 
   @Override
@@ -127,29 +119,17 @@ public class MemoryDistributionCalculator
             });
   }
 
+  /**
+   * We do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update
+   */
   @Override
   public Void visitExchange(ExchangeNode node, MemoryDistributionContext context) {
-    // we do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update
-    // the map
-    if (context == null) {
-      // context == null means this ExchangeNode has no father
-      exchangeMap
-          .computeIfAbsent(node.getPlanNodeId(), x -> new ArrayList<>())
-          .add(node.getPlanNodeId());
-    } else {
-      if (context.memoryDistributionType.equals(
-          MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
-        exchangeMap
-            .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
-            .add(node.getPlanNodeId());
-      } else if (context.memoryDistributionType.equals(
-              MemoryDistributionType.CONSUME_CHILDREN_ONE_BY_ONE)
-          && !exchangeMap.containsKey(context.planNodeId)) {
-        // All children share one split, thus only one node needs to be put into the map
-        exchangeMap
-            .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
-            .add(node.getPlanNodeId());
-      }
+    // context == null means this ExchangeNode doesn't have a father
+    if (context == null || context.memoryDistributionType.equals(MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
+      exchangeNum++;
+    } else if (!context.exchangeAdded){
+      context.exchangeAdded = true;
+      exchangeNum++;
     }
     return null;
   }
@@ -159,10 +139,9 @@ public class MemoryDistributionCalculator
     // LocalSinkHandle and LocalSourceHandle are one-to-one mapped and only LocalSourceHandle do the
     // update
     if (!isSameNode(node.getDownStreamEndpoint())) {
-      exchangeMap
-          .computeIfAbsent(node.getDownStreamPlanNodeId(), x -> new ArrayList<>())
-          .add(node.getDownStreamPlanNodeId());
+      exchangeNum++;
     }
+    node.getChild().accept(this, context);
     return null;
   }
 
@@ -479,6 +458,7 @@ public class MemoryDistributionCalculator
 
   static class MemoryDistributionContext {
     final PlanNodeId planNodeId;
+    boolean exchangeAdded = false;
     final MemoryDistributionType memoryDistributionType;
 
     MemoryDistributionContext(