You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 02:01:29 UTC

[iotdb] 09/09: fix memory calculation

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64da044e25a7fd42e527d24b7f49f1195d0d5d8d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Nov 30 20:53:47 2022 +0800

    fix memory calculation
---
 .../operator/process/AbstractIntoOperator.java     | 16 ++++--
 .../operator/process/DeviceViewIntoOperator.java   | 13 ++++-
 .../execution/operator/process/IntoOperator.java   |  8 ++-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 66 +++++++++++++++++++++-
 4 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index dd117e6d97..6b17fdc625 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -68,17 +68,25 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
   private final ExecutorService writeOperationExecutor;
   private ListenableFuture<TSStatus> writeOperationFuture;
 
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
       List<InsertTabletStatementGenerator> insertTabletStatementGenerators,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
-      ExecutorService intoOperationExecutor) {
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize,
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
     this.writeOperationExecutor = intoOperationExecutor;
+
+    this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
+    this.maxReturnSize = maxReturnSize;
   }
 
   protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -260,17 +268,17 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
+    return maxReturnSize + maxRetainedSize + child.calculateMaxPeekMemory();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
+    return maxReturnSize;
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
+    return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
   }
 
   public static class InsertTabletStatementGenerator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 07cd3c0a0a..183d7d6140 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -62,8 +62,17 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       Map<String, Boolean> targetDeviceToAlignedMap,
       Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
-      ExecutorService intoOperationExecutor) {
-    super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor);
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize,
+      long maxReturnSize) {
+    super(
+        operatorContext,
+        child,
+        null,
+        sourceColumnToInputLocationMap,
+        intoOperationExecutor,
+        maxStatementSize,
+        maxReturnSize);
     this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 5aba97e22a..89953edc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -50,14 +50,18 @@ public class IntoOperator extends AbstractIntoOperator {
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
-      ExecutorService intoOperationExecutor) {
+      ExecutorService intoOperationExecutor,
+      long maxStatementSize,
+      long maxReturnSize) {
     super(
         operatorContext,
         child,
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
         sourceColumnToInputLocationMap,
-        intoOperationExecutor);
+        intoOperationExecutor,
+        maxStatementSize,
+        maxReturnSize);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index a6c2ebd261..a858f56833 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -170,6 +171,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
@@ -177,6 +179,8 @@ import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.Gt;
 import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
@@ -1375,7 +1379,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         sourceColumnToInputLocationMap,
         context.getTypeProvider());
 
+    int rowLimit =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+    long maxReturnSize =
+        node.getChild().getOutputColumnNames().size()
+            * (LongColumn.SIZE_IN_BYTES_PER_POSITION
+                + IntColumn.SIZE_IN_BYTES_PER_POSITION
+                + 256 * Byte.BYTES);
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
     return new IntoOperator(
         operatorContext,
         child,
@@ -1384,7 +1398,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         intoPathDescriptor.getTargetDeviceToAlignedMap(),
         intoPathDescriptor.getSourceTargetPathPairList(),
         sourceColumnToInputLocationMap,
-        context.getIntoOperationExecutor());
+        context.getIntoOperationExecutor(),
+        maxStatementSize,
+        maxReturnSize);
   }
 
   @Override
@@ -1409,6 +1425,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         new HashMap<>();
     Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
         deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+    long statementSizePerLine = 0L;
     for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
         sourceDeviceToTargetPathMap.entrySet()) {
       String sourceDevice = deviceEntry.getKey();
@@ -1424,8 +1441,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       deviceToTargetPathSourceInputLocationMap.put(
           sourceDevice, targetPathToSourceInputLocationMap);
       deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+      statementSizePerLine += calculateStatementSizePerLine(targetPathToDataTypeMap);
     }
 
+    int rowLimit =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    long maxStatementSize = statementSizePerLine * rowLimit;
+    long maxReturnSize =
+        deviceToTargetPathDataTypeMap.size()
+            * (node.getChild().getOutputColumnNames().size() - 1)
+            * (LongColumn.SIZE_IN_BYTES_PER_POSITION
+                + IntColumn.SIZE_IN_BYTES_PER_POSITION
+                + 512 * Byte.BYTES);
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new DeviceViewIntoOperator(
         operatorContext,
@@ -1435,7 +1463,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
         deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
         sourceColumnToInputLocationMap,
-        context.getIntoOperationExecutor());
+        context.getIntoOperationExecutor(),
+        maxStatementSize,
+        maxReturnSize);
   }
 
   private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
@@ -1469,6 +1499,38 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     }
   }
 
+  private long calculateStatementSizePerLine(
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap) {
+    long maxStatementSize = Long.BYTES;
+    List<TSDataType> dataTypes =
+        targetPathToDataTypeMap.values().stream()
+            .flatMap(stringTSDataTypeMap -> stringTSDataTypeMap.values().stream())
+            .collect(Collectors.toList());
+    for (TSDataType dataType : dataTypes) {
+      maxStatementSize += getValueSizePerLine(dataType);
+    }
+    return maxStatementSize;
+  }
+
+  private static long getValueSizePerLine(TSDataType tsDataType) {
+    switch (tsDataType) {
+      case INT32:
+        return Integer.BYTES;
+      case INT64:
+        return Long.BYTES;
+      case FLOAT:
+        return Float.BYTES;
+      case DOUBLE:
+        return Double.BYTES;
+      case BOOLEAN:
+        return Byte.BYTES;
+      case TEXT:
+        return StatisticsManager.getInstance().getMaxBinarySizeInBytes(new PartialPath());
+      default:
+        throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+    }
+  }
+
   @Override
   public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
     List<Operator> children =