You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:36:51 UTC

[GitHub] [iotdb] jixuan1989 commented on a change in pull request #1524: [IOTDB-776] Control the memory usage of flushing the memtable

jixuan1989 commented on a change in pull request #1524:
URL: https://github.com/apache/iotdb/pull/1524#discussion_r515757243



##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -179,7 +179,7 @@ tsfile_size_threshold=536870912
 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
 memtable_size_threshold=1073741824
 
-avg_series_point_number_threshold=100000
+avg_series_point_number_threshold=10000

Review comment:
       add  comment for this parameter

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -218,28 +218,35 @@ mtree_snapshot_threshold_time=3600
 ### Memory Control Configuration
 ####################
 
-# This adapter could adjust the system memory to avoid OOM.
-# It will refuse to create time series or add storage groups under high system load.
-#
-# Attention!!!
-# If disable this parameter, you need to set some parameters according to your system load:
-# Normal scenario: 1-50 storage groups. <100k devices, <10M time series
-# memtable_size_threshold = tsfile_size_threshold = IoTDB memory allocation in byte / 2 / num of storage group / 4
-# Make sure the total num of time series in system * primitive_array_size * 16 <= IoTDB memory allocation in byte / 2 / 4
-# you could reduce the primitive_array_size in very high workload.
-enable_parameter_adapter=true
-
-# Memory Allocation Ratio: Write, Read, and Free Memory.
-# The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 6:3:1
-write_read_free_memory_proportion=6:3:1
+# Whether to enable memory control
+enable_mem_control=true
+
+# Memory Allocation Ratio: Write, Read, Schema and Free Memory.
+# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1
+write_read_schema_free_memory_proportion=4:3:1:2
 
 # primitive array size (length of each array) in array pool
 primitive_array_size=128
 
+# Ratio of memory allocated for buffered arrays, 0.8 by default
+buffered_arrays_memory_proportion=0.8

Review comment:
       0.8 of total memory? or write_memory?

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -218,28 +218,35 @@ mtree_snapshot_threshold_time=3600
 ### Memory Control Configuration
 ####################
 
-# This adapter could adjust the system memory to avoid OOM.
-# It will refuse to create time series or add storage groups under high system load.
-#
-# Attention!!!
-# If disable this parameter, you need to set some parameters according to your system load:
-# Normal scenario: 1-50 storage groups. <100k devices, <10M time series
-# memtable_size_threshold = tsfile_size_threshold = IoTDB memory allocation in byte / 2 / num of storage group / 4
-# Make sure the total num of time series in system * primitive_array_size * 16 <= IoTDB memory allocation in byte / 2 / 4
-# you could reduce the primitive_array_size in very high workload.
-enable_parameter_adapter=true
-
-# Memory Allocation Ratio: Write, Read, and Free Memory.
-# The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 6:3:1
-write_read_free_memory_proportion=6:3:1
+# Whether to enable memory control
+enable_mem_control=true
+
+# Memory Allocation Ratio: Write, Read, Schema and Free Memory.
+# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1
+write_read_schema_free_memory_proportion=4:3:1:2
 
 # primitive array size (length of each array) in array pool
 primitive_array_size=128
 
+# Ratio of memory allocated for buffered arrays, 0.8 by default
+buffered_arrays_memory_proportion=0.8
+
+# Flush proportion for system, 0.3 by default
+flush_proportion=0.3
+
+# Reject proportion for system, 0.8 by default
+reject_proportion=0.8
+
+# If storage group increased more than this threshold, report to system. The default value is 16MB
+storage_group_report_threshold=16777216
+
 # allowed max numbers of deduplicated path in one query
 # it's just an advised value, the real limitation will be the smaller one between this and the one we calculated
 max_deduplicated_path_num=1000
 
+# estimated metadata size of one timeseries in Mtree
+estimated_series_size=300

Review comment:
       unit

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -218,28 +218,35 @@ mtree_snapshot_threshold_time=3600
 ### Memory Control Configuration
 ####################
 
-# This adapter could adjust the system memory to avoid OOM.
-# It will refuse to create time series or add storage groups under high system load.
-#
-# Attention!!!
-# If disable this parameter, you need to set some parameters according to your system load:
-# Normal scenario: 1-50 storage groups. <100k devices, <10M time series
-# memtable_size_threshold = tsfile_size_threshold = IoTDB memory allocation in byte / 2 / num of storage group / 4
-# Make sure the total num of time series in system * primitive_array_size * 16 <= IoTDB memory allocation in byte / 2 / 4
-# you could reduce the primitive_array_size in very high workload.
-enable_parameter_adapter=true
-
-# Memory Allocation Ratio: Write, Read, and Free Memory.
-# The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 6:3:1
-write_read_free_memory_proportion=6:3:1
+# Whether to enable memory control
+enable_mem_control=true
+
+# Memory Allocation Ratio: Write, Read, Schema and Free Memory.
+# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1
+write_read_schema_free_memory_proportion=4:3:1:2
 
 # primitive array size (length of each array) in array pool
 primitive_array_size=128
 
+# Ratio of memory allocated for buffered arrays, 0.8 by default
+buffered_arrays_memory_proportion=0.8
+
+# Flush proportion for system, 0.3 by default
+flush_proportion=0.3
+
+# Reject proportion for system, 0.8 by default
+reject_proportion=0.8
+
+# If storage group increased more than this threshold, report to system. The default value is 16MB

Review comment:
       comment is  not clear.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
##########
@@ -121,6 +132,21 @@ ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
 
   void release();
 
+  /**
+   * only used when mem control enabled
+   */
+  boolean checkIfNeedStartNewChunk(String deviceId, String measurement);
+
+  /**
+   * only used when mem control enabled
+   */
+  int getCurrentTVListSize(String deviceId, String measurement);

Review comment:
       add javadoc: must guarantee the device exists in the workmemtable

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
##########
@@ -56,6 +56,17 @@
 
   private long memSize = 0;
 
+  /**
+   * The initial value is false because we should calculate the text data size when recover
+   * memTable!!
+   */
+  protected boolean enableMemControl = false;

Review comment:
       if you only use `!enableMemControl`, why not call it disableMemControl

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
##########
@@ -589,39 +580,58 @@ public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
   public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
     try {
       for (PartialPath storageGroup : storageGroups) {
-
+        totalSeriesNumber -= mtree.getAllTimeseriesCount(storageGroup);
         // clear cached MNode
+        if (!allowToCreateNewSeries && 
+            totalSeriesNumber * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
+          logger.info("Current series number {} come back to normal level", totalSeriesNumber);
+          allowToCreateNewSeries = true;
+        }
         mNodeCache.clear();
 
         // try to delete storage group
         List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
         for (MeasurementMNode leafMNode : leafMNodes) {
           removeFromTagInvertedIndex(leafMNode);
+          // update statistics in schemaDataTypeNumMap
+          updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
         }
 
-        if (config.isEnableParameterAdapter()) {
-          IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
-          int size = seriesNumberInStorageGroups.get(storageGroup.getFullPath());
-          IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(size * -1);
-          ActiveTimeSeriesCounter.getInstance().delete(storageGroup.getFullPath());
-          seriesNumberInStorageGroups.remove(storageGroup.getFullPath());
-          if (size == maxSeriesNumberAmongStorageGroup) {
-            maxSeriesNumberAmongStorageGroup =
-                seriesNumberInStorageGroups.values().stream().max(Integer::compareTo).orElse(0);
-          }
-        }
         // if success
         if (!isRecovering) {
           logWriter.deleteStorageGroup(storageGroup.getFullPath());
         }
       }
-    } catch (ConfigAdjusterException e) {
-      throw new MetadataException(e);
     } catch (IOException e) {
       throw new MetadataException(e.getMessage());
     }
   }
 
+  /**
+   * update statistics in schemaDataTypeNumMap
+   *
+   * @param type data type
+   * @param num  1 for creating timeseries and -1 for deleting timeseries
+   */
+  private synchronized void updateSchemaDataTypeNumMap(TSDataType type, int num) {
+    // add an array of the series type
+    schemaDataTypeNumMap.put(type, schemaDataTypeNumMap.getOrDefault(type, 0) + num);
+    // add an array of time
+    schemaDataTypeNumMap.put(TSDataType.INT64,
+        schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
+
+    int currentDataTypeTotalNum = 0;
+    for (int typeSize : schemaDataTypeNumMap.values()) {

Review comment:
       how about set `currentDataTypeTotalNum` as a field?

##########
File path: server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage all primitive data list in memory, including get and release operation.
+ */
+public class PrimitiveArrayManager {
+
+  /**
+   * data type -> ArrayDeque<Array>
+   */
+  private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap = new EnumMap<>(
+      TSDataType.class);
+
+  /**
+   * data type -> current number of buffered arrays
+   */
+  private static final Map<TSDataType, Integer> bufferedArraysNumMap = new EnumMap<>(
+      TSDataType.class);
+
+  /**
+   * data type -> ratio of data type in schema, which could be seen as recommended ratio
+   */
+  private static final Map<TSDataType, Double> bufferedArraysNumRatio = new EnumMap<>(
+      TSDataType.class);
+  private static int currentDataTypeTotalNum = 0;
+
+  private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class);
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  public static final int ARRAY_SIZE = config.getPrimitiveArraySize();
+
+  /**
+   * threshold total size of arrays for all data types
+   */
+  private static final double BUFFERED_ARRAY_SIZE_THRESHOLD =
+      config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
+
+  /**
+   * total size of buffered arrays
+   */
+  private static AtomicLong bufferedArraysRamSize = new AtomicLong();
+
+  /**
+   * total size of out of buffer arrays
+   */
+  private static AtomicLong outOfBufferArraysRamSize = new AtomicLong();
+
+  static {
+    bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
+    bufferedArraysMap.put(TSDataType.INT32, new ArrayDeque<>());
+    bufferedArraysMap.put(TSDataType.INT64, new ArrayDeque<>());
+    bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
+    bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
+    bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
+  }
+
+  private PrimitiveArrayManager() {
+    logger.info("BufferedArraySizeThreshold is {}", BUFFERED_ARRAY_SIZE_THRESHOLD);
+  }
+
+  /**
+   * Get primitive data lists according to type
+   *
+   * @param dataType data type
+   * @return an array
+   */
+  public static Object getPrimitiveArraysByType(TSDataType dataType) {
+    // check memory of buffered array, if already full, generate OOB
+    if (bufferedArraysRamSize.get() + ARRAY_SIZE * dataType.getDataTypeSize()
+        > BUFFERED_ARRAY_SIZE_THRESHOLD) {
+      // return an out of buffer array
+      outOfBufferArraysRamSize.addAndGet(ARRAY_SIZE * dataType.getDataTypeSize());
+      return createPrimitiveArray(dataType);
+    }
+
+    synchronized (bufferedArraysMap.get(dataType)) {
+      // try to get a buffered array
+      Object dataArray = bufferedArraysMap.get(dataType).poll();
+      if (dataArray != null) {
+        return dataArray;
+      }
+      // no buffered array, create one
+      bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
+      bufferedArraysRamSize.addAndGet(ARRAY_SIZE * dataType.getDataTypeSize());
+    }
+
+    return createPrimitiveArray(dataType);
+  }
+
+  private static Object createPrimitiveArray(TSDataType dataType) {
+    Object dataArray;
+    switch (dataType) {
+      case BOOLEAN:
+        dataArray = new boolean[ARRAY_SIZE];
+        break;
+      case INT32:
+        dataArray = new int[ARRAY_SIZE];
+        break;
+      case INT64:
+        dataArray = new long[ARRAY_SIZE];
+        break;
+      case FLOAT:
+        dataArray = new float[ARRAY_SIZE];
+        break;
+      case DOUBLE:
+        dataArray = new double[ARRAY_SIZE];
+        break;
+      case TEXT:
+        dataArray = new Binary[ARRAY_SIZE];
+        break;
+      default:
+        throw new UnSupportedDataTypeException(dataType.toString());
+    }
+
+    return dataArray;
+  }
+
+  /**
+   * Get primitive data lists according to data type and size, only for TVList's sorting
+   *
+   * @param dataType data type
+   * @param size     needed capacity
+   * @return an array of primitive data arrays
+   */
+  public static synchronized Object createDataListsByType(TSDataType dataType, int size) {

Review comment:
       why `synchronized`

##########
File path: server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
##########
@@ -120,9 +119,6 @@ void clearValue() {
   @Override
   void clearSortedValue() {
     if (sortedValues != null) {
-      for (Binary[] dataArray : sortedValues) {

Review comment:
       now, sorted timestamps use new memory? no OOM danger? 

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
##########
@@ -394,14 +411,13 @@ public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws Meta
         }
       }
 
-      // update statistics
-      if (config.isEnableParameterAdapter()) {
-        int size = seriesNumberInStorageGroups.get(storageGroupPath.getFullPath());
-        seriesNumberInStorageGroups.put(storageGroupPath.getFullPath(), size + 1);
-        if (size + 1 > maxSeriesNumberAmongStorageGroup) {
-          maxSeriesNumberAmongStorageGroup = size + 1L;
-        }
+      // update statistics and schemaDataTypeNumMap
+      totalSeriesNumber++;

Review comment:
       concurrency problem?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org