You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/07/03 08:54:36 UTC

[iotdb] 01/01: [To rel/0.11] Primitive Array Manager v2

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch arraysize11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d162c2669cd822902f9c274152d005f459009909
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Jul 3 16:48:49 2021 +0800

    [To rel/0.11] Primitive Array Manager v2
---
 .../org/apache/iotdb/db/metadata/MManager.java     |  16 +-
 .../iotdb/db/rescon/PrimitiveArrayManager.java     | 219 ++++++++++-----------
 2 files changed, 108 insertions(+), 127 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 325a2e4..68f78fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -103,8 +103,6 @@ public class MManager {
   private static final String DEBUG_MSG_1 = "%s: TimeSeries %s's tag info has been removed from tag inverted index ";
   private static final String PREVIOUS_CONDITION = "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
 
-  private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;
-
   private static final Logger logger = LoggerFactory.getLogger(MManager.class);
 
   /**
@@ -128,8 +126,6 @@ public class MManager {
 
   // data type -> number
   private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
-  // reported total series number
-  private long reportedDataTypeTotalNum;
   private AtomicLong totalSeriesNumber = new AtomicLong();
   private boolean initialized;
   protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -224,7 +220,6 @@ public class MManager {
       mtree = new MTree();
       logger.error("Cannot read MTree from file, using an empty new one", e);
     }
-    reportedDataTypeTotalNum = 0L;
     initialized = true;
   }
 
@@ -293,7 +288,6 @@ public class MManager {
         tagLogFile = null;
       }
       this.schemaDataTypeNumMap.clear();
-      this.reportedDataTypeTotalNum = 0L;
       initialized = false;
       if (config.isEnableMTreeSnapshot() && timedCreateMTreeSnapshotThread != null) {
         timedCreateMTreeSnapshotThread.shutdownNow();
@@ -646,15 +640,7 @@ public class MManager {
     schemaDataTypeNumMap.put(TSDataType.INT64,
         schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
 
-    // total current DataType Total Num (twice of number of time series)
-    // used in primitive array manager
-    long currentDataTypeTotalNum = totalSeriesNumber.get() * 2;
-
-    if (num > 0 && currentDataTypeTotalNum - reportedDataTypeTotalNum
-        >= UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD) {
-      PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, currentDataTypeTotalNum);
-      reportedDataTypeTotalNum = currentDataTypeTotalNum;
-    }
+    PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get());
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index 20063fd..78e849e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -18,41 +18,32 @@
  */
 package org.apache.iotdb.db.rescon;
 
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Manage all primitive data list in memory, including get and release operation.
- */
-public class PrimitiveArrayManager {
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
 
-  /**
-   * data type -> ArrayDeque<Array>
-   */
-  private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap = new EnumMap<>(
-      TSDataType.class);
+/** Manage all primitive data list in memory, including get and release operation. */
+public class PrimitiveArrayManager {
 
-  /**
-   * data type -> current number of buffered arrays
-   */
-  private static final Map<TSDataType, Integer> bufferedArraysNumMap = new EnumMap<>(
-      TSDataType.class);
+  /** data type -> ArrayDeque<Array> */
+  private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
+      new EnumMap<>(TSDataType.class);
 
-  /**
-   * data type -> ratio of data type in schema, which could be seen as recommended ratio
-   */
-  private static final Map<TSDataType, Double> bufferedArraysNumRatio = new EnumMap<>(
-      TSDataType.class);
+  /** data type -> ratio of data type in schema, which could be seen as recommended ratio */
+  private static final Map<TSDataType, Double> bufferedArraysNumRatio =
+      new EnumMap<>(TSDataType.class);
 
   private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class);
 
@@ -60,21 +51,15 @@ public class PrimitiveArrayManager {
 
   public static final int ARRAY_SIZE = config.getPrimitiveArraySize();
 
-  /**
-   * threshold total size of arrays for all data types
-   */
+  /** threshold total size of arrays for all data types */
   private static final double BUFFERED_ARRAY_SIZE_THRESHOLD =
       config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
 
-  /**
-   * total size of buffered arrays
-   */
-  private static AtomicLong bufferedArraysRamSize = new AtomicLong();
+  /** total size of buffered arrays */
+  private static final AtomicLong bufferedArraysRamSize = new AtomicLong();
 
-  /**
-   * total size of out of buffer arrays
-   */
-  private static AtomicLong outOfBufferArraysRamSize = new AtomicLong();
+  /** total size of out of buffer arrays */
+  private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong();
 
   static {
     bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
@@ -96,11 +81,12 @@ public class PrimitiveArrayManager {
    * @return an array
    */
   public static Object getPrimitiveArraysByType(TSDataType dataType) {
+    long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize();
+
     // check memory of buffered array, if already full, generate OOB
-    if (bufferedArraysRamSize.get() + ARRAY_SIZE * dataType.getDataTypeSize()
-        > BUFFERED_ARRAY_SIZE_THRESHOLD) {
+    if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) {
       // return an out of buffer array
-      outOfBufferArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
+      outOfBufferArraysRamSize.addAndGet(delta);
       return createPrimitiveArray(dataType);
     }
 
@@ -110,11 +96,10 @@ public class PrimitiveArrayManager {
       if (dataArray != null) {
         return dataArray;
       }
-      // no buffered array, create one
-      bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
-      bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
     }
 
+    // no buffered array, create one
+    bufferedArraysRamSize.addAndGet(delta);
     return createPrimitiveArray(dataType);
   }
 
@@ -150,10 +135,10 @@ public class PrimitiveArrayManager {
    * Get primitive data lists according to data type and size, only for TVList's sorting
    *
    * @param dataType data type
-   * @param size     needed capacity
+   * @param size needed capacity
    * @return an array of primitive data arrays
    */
-  public static synchronized Object createDataListsByType(TSDataType dataType, int size) {
+  public static Object createDataListsByType(TSDataType dataType, int size) {
     int arrayNumber = (int) Math.ceil((float) size / (float) ARRAY_SIZE);
     switch (dataType) {
       case BOOLEAN:
@@ -200,94 +185,104 @@ public class PrimitiveArrayManager {
   /**
    * This method is called when bringing back data array
    *
-   * @param dataArray data array
+   * @param releasingArray data array to be released
    */
-  public static void release(Object dataArray) {
-    TSDataType dataType;
-    if (dataArray instanceof boolean[]) {
-      dataType = TSDataType.BOOLEAN;
-    } else if (dataArray instanceof int[]) {
-      dataType = TSDataType.INT32;
-    } else if (dataArray instanceof long[]) {
-      dataType = TSDataType.INT64;
-    } else if (dataArray instanceof float[]) {
-      dataType = TSDataType.FLOAT;
-    } else if (dataArray instanceof double[]) {
-      dataType = TSDataType.DOUBLE;
-    } else if (dataArray instanceof Binary[]) {
-      Arrays.fill((Binary[]) dataArray, null);
-      dataType = TSDataType.TEXT;
+  public static void release(Object releasingArray) {
+    TSDataType releasingType;
+    if (releasingArray instanceof boolean[]) {
+      releasingType = TSDataType.BOOLEAN;
+    } else if (releasingArray instanceof int[]) {
+      releasingType = TSDataType.INT32;
+    } else if (releasingArray instanceof long[]) {
+      releasingType = TSDataType.INT64;
+    } else if (releasingArray instanceof float[]) {
+      releasingType = TSDataType.FLOAT;
+    } else if (releasingArray instanceof double[]) {
+      releasingType = TSDataType.DOUBLE;
+    } else if (releasingArray instanceof Binary[]) {
+      Arrays.fill((Binary[]) releasingArray, null);
+      releasingType = TSDataType.TEXT;
     } else {
       throw new UnSupportedDataTypeException("Unknown data array type");
     }
 
-    // Check out of buffer array num
-    if (outOfBufferArraysRamSize.get() > 0 && isCurrentDataTypeExceeded(dataType)) {
-      // release an out of buffer array
-      bringBackOOBArray(dataType, ARRAY_SIZE);
-    } else if (outOfBufferArraysRamSize.get() > 0 && !isCurrentDataTypeExceeded(dataType)) {
-      // if the ratio of buffered arrays of this data type does not exceed the schema ratio,
-      // choose one replaced array who has larger ratio than schema recommended ratio
-      TSDataType replacedDataType = null;
-      for (Map.Entry<TSDataType, Integer> entry : bufferedArraysNumMap.entrySet()) {
-        if (isCurrentDataTypeExceeded(entry.getKey())) {
-          replacedDataType = entry.getKey();
-          // bring back the replaced array as OOB array
-          bringBackOOBArray(replacedDataType, ARRAY_SIZE);
-          break;
-        }
-      }
-      if (replacedDataType != null) {
-        // if we find a replaced array, bring back the original array as a buffered array
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "The ratio of {} in buffered array has not reached the schema ratio. Replaced by {}",
-              dataType, replacedDataType);
+    if (outOfBufferArraysRamSize.get() <= 0) {
+      // if there is no out of buffer array, bring back as buffered array directly
+      putBackBufferedArray(releasingType, releasingArray);
+    } else {
+      // if the system has out of buffer array, we need to release some memory
+      if (!isCurrentDataTypeExceeded(releasingType)) {
+        // if the buffered array of the releasingType is less than expected
+        // choose an array of redundantDataType to release and try to buffer the array of
+        // releasingType
+        for (Entry<TSDataType, ArrayDeque<Object>> entry : bufferedArraysMap.entrySet()) {
+          TSDataType dataType = entry.getKey();
+          if (isCurrentDataTypeExceeded(dataType)) {
+            // if we find a replaced array, bring back the original array as a buffered array
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  "The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}",
+                  releasingType,
+                  dataType);
+            }
+            // bring back the replaced array as OOB array
+            replaceBufferedArray(releasingType, releasingArray, dataType);
+            break;
+          }
         }
-        bringBackBufferedArray(dataType, dataArray);
-      } else {
-        // or else bring back the original array as OOB array
-        bringBackOOBArray(dataType, ARRAY_SIZE);
       }
-    } else {
-      // if there is no out of buffer array, bring back as buffered array directly
-      bringBackBufferedArray(dataType, dataArray);
+
+      releaseOutOfBuffer(releasingType);
     }
   }
 
   /**
    * Bring back a buffered array
    *
-   * @param dataType  data type
+   * @param dataType data type
    * @param dataArray data array
    */
-  private static void bringBackBufferedArray(TSDataType dataType, Object dataArray) {
+  private static void putBackBufferedArray(TSDataType dataType, Object dataArray) {
     synchronized (bufferedArraysMap.get(dataType)) {
       bufferedArraysMap.get(dataType).add(dataArray);
-      bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
     }
-    bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * dataType.getDataTypeSize());
   }
 
-  /**
-   * Bring back out of buffered array
-   *
-   * @param dataType data type
-   * @param size     capacity
-   */
-  private static void bringBackOOBArray(TSDataType dataType, int size) {
-    outOfBufferArraysRamSize.addAndGet((long) -size * dataType.getDataTypeSize());
+  private static void replaceBufferedArray(
+      TSDataType releasingType, Object releasingArray, TSDataType redundantType) {
+    synchronized (bufferedArraysMap.get(redundantType)) {
+      if (bufferedArraysMap.get(redundantType).poll() != null) {
+        bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize());
+      }
+    }
+
+    if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize()
+        < BUFFERED_ARRAY_SIZE_THRESHOLD) {
+      ArrayDeque<Object> releasingArrays = bufferedArraysMap.get(releasingType);
+      synchronized (releasingArrays) {
+        releasingArrays.add(releasingArray);
+      }
+      bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize());
+    }
+  }
+
+  private static void releaseOutOfBuffer(TSDataType dataType) {
+    outOfBufferArraysRamSize.getAndUpdate(
+        l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize()));
   }
 
   /**
-   * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a specific type)
-   * @param total current DataType Total Num (twice of number of time series)
+   * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
+   *     specific type)
+   * @param totalSeries total time series number
    */
-  public static void updateSchemaDataTypeNum(Map<TSDataType, Integer> schemaDataTypeNumMap,
-      long total) {
+  public static void updateSchemaDataTypeNum(
+      Map<TSDataType, Integer> schemaDataTypeNumMap, long totalSeries) {
     for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
       TSDataType dataType = entry.getKey();
-      bufferedArraysNumRatio.put(dataType, (double) schemaDataTypeNumMap.get(dataType) / total);
+      // one time series has 2 columns (time column + value column)
+      bufferedArraysNumRatio.put(
+          dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2));
     }
   }
 
@@ -299,13 +294,14 @@ public class PrimitiveArrayManager {
    * @return true if the buffered array ratio exceeds the recommend ratio
    */
   private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
-    int total = 0;
-    for (int num : bufferedArraysNumMap.values()) {
-      total += num;
+    long total = 0;
+    for (ArrayDeque<Object> value : bufferedArraysMap.values()) {
+      total += value.size();
     }
-    return total != 0 &&
-        ((double) bufferedArraysNumMap.getOrDefault(dataType, 0) / total >
-            bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
+    long arrayNumInBuffer =
+        bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0;
+    return total != 0
+        && ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
   }
 
   public static void close() {
@@ -313,7 +309,6 @@ public class PrimitiveArrayManager {
       dataListQueue.clear();
     }
 
-    bufferedArraysNumMap.clear();
     bufferedArraysNumRatio.clear();
 
     bufferedArraysRamSize.set(0);