You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 02:01:29 UTC
[iotdb] 09/09: fix memory calculation
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64da044e25a7fd42e527d24b7f49f1195d0d5d8d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 20:53:47 2022 +0800
fix memory calculation
---
.../operator/process/AbstractIntoOperator.java | 16 ++++--
.../operator/process/DeviceViewIntoOperator.java | 13 ++++-
.../execution/operator/process/IntoOperator.java | 8 ++-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 66 +++++++++++++++++++++-
4 files changed, 93 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index dd117e6d97..6b17fdc625 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -68,17 +68,25 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
private final ExecutorService writeOperationExecutor;
private ListenableFuture<TSStatus> writeOperationFuture;
+ private final long maxRetainedSize;
+ private final long maxReturnSize;
+
public AbstractIntoOperator(
OperatorContext operatorContext,
Operator child,
List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
Map<String, InputLocation> sourceColumnToInputLocationMap,
- ExecutorService intoOperationExecutor) {
+ ExecutorService intoOperationExecutor,
+ long maxStatementSize,
+ long maxReturnSize) {
this.operatorContext = operatorContext;
this.child = child;
this.insertTabletStatementGenerators = insertTabletStatementGenerators;
this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
this.writeOperationExecutor = intoOperationExecutor;
+
+ this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
+ this.maxReturnSize = maxReturnSize;
}
protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -260,17 +268,17 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
- return child.calculateMaxPeekMemory();
+ return maxReturnSize + maxRetainedSize + child.calculateMaxPeekMemory();
}
@Override
public long calculateMaxReturnSize() {
- return child.calculateMaxReturnSize();
+ return maxReturnSize;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return child.calculateRetainedSizeAfterCallingNext();
+ return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
}
public static class InsertTabletStatementGenerator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 07cd3c0a0a..183d7d6140 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -62,8 +62,17 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
Map<String, Boolean> targetDeviceToAlignedMap,
Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
Map<String, InputLocation> sourceColumnToInputLocationMap,
- ExecutorService intoOperationExecutor) {
- super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor);
+ ExecutorService intoOperationExecutor,
+ long maxStatementSize,
+ long maxReturnSize) {
+ super(
+ operatorContext,
+ child,
+ null,
+ sourceColumnToInputLocationMap,
+ intoOperationExecutor,
+ maxStatementSize,
+ maxReturnSize);
this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 5aba97e22a..89953edc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -50,14 +50,18 @@ public class IntoOperator extends AbstractIntoOperator {
Map<String, Boolean> targetDeviceToAlignedMap,
List<Pair<String, PartialPath>> sourceTargetPathPairList,
Map<String, InputLocation> sourceColumnToInputLocationMap,
- ExecutorService intoOperationExecutor) {
+ ExecutorService intoOperationExecutor,
+ long maxStatementSize,
+ long maxReturnSize) {
super(
operatorContext,
child,
constructInsertTabletStatementGenerators(
targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
sourceColumnToInputLocationMap,
- intoOperationExecutor);
+ intoOperationExecutor,
+ maxStatementSize,
+ maxReturnSize);
this.sourceTargetPathPairList = sourceTargetPathPairList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index a6c2ebd261..a858f56833 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -170,6 +171,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
@@ -177,6 +179,8 @@ import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.Gt;
import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
@@ -1375,7 +1379,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
sourceColumnToInputLocationMap,
context.getTypeProvider());
+ int rowLimit =
+ IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+ long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+ long maxReturnSize =
+ node.getChild().getOutputColumnNames().size()
+ * (LongColumn.SIZE_IN_BYTES_PER_POSITION
+ + IntColumn.SIZE_IN_BYTES_PER_POSITION
+ + 256 * Byte.BYTES);
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
return new IntoOperator(
operatorContext,
child,
@@ -1384,7 +1398,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
intoPathDescriptor.getTargetDeviceToAlignedMap(),
intoPathDescriptor.getSourceTargetPathPairList(),
sourceColumnToInputLocationMap,
- context.getIntoOperationExecutor());
+ context.getIntoOperationExecutor(),
+ maxStatementSize,
+ maxReturnSize);
}
@Override
@@ -1409,6 +1425,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
new HashMap<>();
Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+ long statementSizePerLine = 0L;
for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
sourceDeviceToTargetPathMap.entrySet()) {
String sourceDevice = deviceEntry.getKey();
@@ -1424,8 +1441,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
deviceToTargetPathSourceInputLocationMap.put(
sourceDevice, targetPathToSourceInputLocationMap);
deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+ statementSizePerLine += calculateStatementSizePerLine(targetPathToDataTypeMap);
}
+ int rowLimit =
+ IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+ long maxStatementSize = statementSizePerLine * rowLimit;
+ long maxReturnSize =
+ deviceToTargetPathDataTypeMap.size()
+ * (node.getChild().getOutputColumnNames().size() - 1)
+ * (LongColumn.SIZE_IN_BYTES_PER_POSITION
+ + IntColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES);
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new DeviceViewIntoOperator(
operatorContext,
@@ -1435,7 +1463,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
sourceColumnToInputLocationMap,
- context.getIntoOperationExecutor());
+ context.getIntoOperationExecutor(),
+ maxStatementSize,
+ maxReturnSize);
}
private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
@@ -1469,6 +1499,38 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
}
+ private long calculateStatementSizePerLine(
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap) {
+ long maxStatementSize = Long.BYTES;
+ List<TSDataType> dataTypes =
+ targetPathToDataTypeMap.values().stream()
+ .flatMap(stringTSDataTypeMap -> stringTSDataTypeMap.values().stream())
+ .collect(Collectors.toList());
+ for (TSDataType dataType : dataTypes) {
+ maxStatementSize += getValueSizePerLine(dataType);
+ }
+ return maxStatementSize;
+ }
+
+ private static long getValueSizePerLine(TSDataType tsDataType) {
+ switch (tsDataType) {
+ case INT32:
+ return Integer.BYTES;
+ case INT64:
+ return Long.BYTES;
+ case FLOAT:
+ return Float.BYTES;
+ case DOUBLE:
+ return Double.BYTES;
+ case BOOLEAN:
+ return Byte.BYTES;
+ case TEXT:
+ return StatisticsManager.getInstance().getMaxBinarySizeInBytes(new PartialPath());
+ default:
+ throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+ }
+ }
+
@Override
public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
List<Operator> children =