You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/22 10:47:31 UTC
[iotdb] branch master updated: [IOTDB-5117] Fix some issues in MemoryDistributionCalculator (#8580)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 15c0eabcb6 [IOTDB-5117] Fix some issues in MemoryDistributionCalculator (#8580)
15c0eabcb6 is described below
commit 15c0eabcb6733f0f7aa43f3f63805dd9fe9b5991
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Dec 22 18:47:25 2022 +0800
[IOTDB-5117] Fix some issues in MemoryDistributionCalculator (#8580)
---
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 2 +-
.../plan/planner/MemoryDistributionCalculator.java | 55 ++++++----------------
2 files changed, 16 insertions(+), 41 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 5b6d2abc86..36d53f6279 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -126,7 +126,7 @@ public class LocalExecutionPlanner {
private void setMemoryLimitForHandle(TFragmentInstanceId fragmentInstanceId, PlanNode plan) {
MemoryDistributionCalculator visitor = new MemoryDistributionCalculator();
plan.accept(visitor, null);
- long totalSplit = visitor.calculateTotalSplit();
+ int totalSplit = visitor.calculateTotalSplit();
if (totalSplit == 0) {
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
index fb117804a7..fe87cb9972 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
@@ -70,28 +70,15 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
public class MemoryDistributionCalculator
extends PlanVisitor<Void, MemoryDistributionCalculator.MemoryDistributionContext> {
/** This map is used to calculate the total split of memory */
- private final Map<PlanNodeId, List<PlanNodeId>> exchangeMap;
-
- public MemoryDistributionCalculator() {
- this.exchangeMap = new HashMap<>();
- }
+ private int exchangeNum;
- public long calculateTotalSplit() {
- long res = 0;
- for (List<PlanNodeId> l : exchangeMap.values()) {
- res += l.size();
- }
- return res;
+ public int calculateTotalSplit() {
+ return exchangeNum;
}
@Override
@@ -127,29 +114,17 @@ public class MemoryDistributionCalculator
});
}
+ /** We do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update */
@Override
public Void visitExchange(ExchangeNode node, MemoryDistributionContext context) {
- // we do not distinguish LocalSourceHandle/SourceHandle by not letting LocalSinkHandle update
- // the map
- if (context == null) {
- // context == null means this ExchangeNode has no father
- exchangeMap
- .computeIfAbsent(node.getPlanNodeId(), x -> new ArrayList<>())
- .add(node.getPlanNodeId());
- } else {
- if (context.memoryDistributionType.equals(
- MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
- exchangeMap
- .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
- .add(node.getPlanNodeId());
- } else if (context.memoryDistributionType.equals(
- MemoryDistributionType.CONSUME_CHILDREN_ONE_BY_ONE)
- && !exchangeMap.containsKey(context.planNodeId)) {
- // All children share one split, thus only one node needs to be put into the map
- exchangeMap
- .computeIfAbsent(context.planNodeId, x -> new ArrayList<>())
- .add(node.getPlanNodeId());
- }
+ // context == null means this ExchangeNode doesn't have a father
+ if (context == null
+ || context.memoryDistributionType.equals(
+ MemoryDistributionType.CONSUME_ALL_CHILDREN_AT_THE_SAME_TIME)) {
+ exchangeNum++;
+ } else if (!context.exchangeAdded) {
+ context.exchangeAdded = true;
+ exchangeNum++;
}
return null;
}
@@ -159,10 +134,9 @@ public class MemoryDistributionCalculator
// LocalSinkHandle and LocalSourceHandle are one-to-one mapped and only LocalSourceHandle do the
// update
if (!isSameNode(node.getDownStreamEndpoint())) {
- exchangeMap
- .computeIfAbsent(node.getDownStreamPlanNodeId(), x -> new ArrayList<>())
- .add(node.getDownStreamPlanNodeId());
+ exchangeNum++;
}
+ node.getChild().accept(this, context);
return null;
}
@@ -479,6 +453,7 @@ public class MemoryDistributionCalculator
static class MemoryDistributionContext {
final PlanNodeId planNodeId;
+ boolean exchangeAdded = false;
final MemoryDistributionType memoryDistributionType;
MemoryDistributionContext(