You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/22 08:39:15 UTC
[iotdb] 01/01: Fix memory distribution bug
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch memoryDistribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f7f2ac2affa58e9f9332798d7db9a9867265f249
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Dec 22 16:39:00 2022 +0800
Fix memory distribution bug
---
.../plan/planner/MemoryDistributionCalculator.java | 48 +++++++---------------
1 file changed, 14 insertions(+), 34 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
index fb117804a7..5120e73647 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
@@ -80,18 +80,10 @@ import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode
public class MemoryDistributionCalculator
extends PlanVisitor<Void, MemoryDistributionCalculator.MemoryDistributionContext> {
/** This map is used to calculate the total split of memory */
- private final Map<PlanNodeId, List<PlanNodeId>> exchangeMap;
-
- public MemoryDistributionCalculator() {
- this.exchangeMap = new HashMap<>();
- }
+ private int exchangeNum;
public long calculateTotalSplit() {
- long res = 0;
- for (List<PlanNodeId> l : exchangeMap.values()) {
- res += l.size();
- }
- return res;
+ return exchangeNum;
}
@Override
@@ -127,29 +119,17 @@ public class MemoryDistributionCalculator
});
}
+ /**
+ * We do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update
+ */
@Override
public Void visitExchange(ExchangeNode node, MemoryDistributionContext context) {
- // we do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update
- // the map
- if (context == null) {
- // context == null means this ExchangeNode has no father
- exchangeMap
- .computeIfAbsent(node.getPlanNodeId(), x -> new ArrayList<>())
- .add(node.getPlanNodeId());
- } else {
- if (context.memoryDistributionType.equals(
- MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
- exchangeMap
- .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
- .add(node.getPlanNodeId());
- } else if (context.memoryDistributionType.equals(
- MemoryDistributionType.CONSUME_CHILDREN_ONE_BY_ONE)
- && !exchangeMap.containsKey(context.planNodeId)) {
- // All children share one split, thus only one node needs to be put into the map
- exchangeMap
- .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
- .add(node.getPlanNodeId());
- }
+ // context == null means this ExchangeNode doesn't have a father
+ if (context == null || context.memoryDistributionType.equals(MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
+ exchangeNum++;
+ } else if (!context.exchangeAdded){
+ context.exchangeAdded = true;
+ exchangeNum++;
}
return null;
}
@@ -159,10 +139,9 @@ public class MemoryDistributionCalculator
// LocalSinkHandle and LocalSourceHandle are one-to-one mapped and only LocalSourceHandle do the
// update
if (!isSameNode(node.getDownStreamEndpoint())) {
- exchangeMap
- .computeIfAbsent(node.getDownStreamPlanNodeId(), x -> new ArrayList<>())
- .add(node.getDownStreamPlanNodeId());
+ exchangeNum++;
}
+ node.getChild().accept(this, context);
return null;
}
@@ -479,6 +458,7 @@ public class MemoryDistributionCalculator
static class MemoryDistributionContext {
final PlanNodeId planNodeId;
+ boolean exchangeAdded = false;
final MemoryDistributionType memoryDistributionType;
MemoryDistributionContext(