You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/03 08:49:13 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] Primitive Array Manager v2 (#3484)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 0d2872e  [To rel/0.12] Primitive Array Manager v2 (#3484)
0d2872e is described below

commit 0d2872ed05ff52e92584feb6e688b1793ab44036
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Jul 3 16:48:49 2021 +0800

    [To rel/0.12] Primitive Array Manager v2 (#3484)
---
 .../org/apache/iotdb/db/metadata/MManager.java     |  17 +--
 .../iotdb/db/rescon/PrimitiveArrayManager.java     | 161 +++++++++++----------
 2 files changed, 84 insertions(+), 94 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 8224995..d34088c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -118,8 +118,6 @@ public class MManager {
   private static final String PREVIOUS_CONDITION =
       "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
 
-  private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;
-
   private static final Logger logger = LoggerFactory.getLogger(MManager.class);
 
   /** A thread will check whether the MTree is modified lately each such interval. Unit: second */
@@ -142,8 +140,6 @@ public class MManager {
 
   // data type -> number
   private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
-  // reported total series number
-  private long reportedDataTypeTotalNum;
   private AtomicLong totalSeriesNumber = new AtomicLong();
   private boolean initialized;
   protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -239,7 +235,6 @@ public class MManager {
       logger.error(
           "Cannot recover all MTree from file, we try to recover as possible as we can", e);
     }
-    reportedDataTypeTotalNum = 0L;
     initialized = true;
   }
 
@@ -315,7 +310,6 @@ public class MManager {
         tagLogFile = null;
       }
       this.schemaDataTypeNumMap.clear();
-      this.reportedDataTypeTotalNum = 0L;
       initialized = false;
       if (config.isEnableMTreeSnapshot() && timedCreateMTreeSnapshotThread != null) {
         timedCreateMTreeSnapshotThread.shutdownNow();
@@ -660,16 +654,7 @@ public class MManager {
     schemaDataTypeNumMap.put(
         TSDataType.INT64, schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
 
-    // total current DataType Total Num (twice of number of time series)
-    // used in primitive array manager
-    long currentDataTypeTotalNum = totalSeriesNumber.get() * 2;
-
-    if (num > 0
-        && currentDataTypeTotalNum - reportedDataTypeTotalNum
-            >= UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD) {
-      PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, currentDataTypeTotalNum);
-      reportedDataTypeTotalNum = currentDataTypeTotalNum;
-    }
+    PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get());
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index aa6c264..e1b3fdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -31,6 +31,7 @@ import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 
 /** Manage all primitive data list in memory, including get and release operation. */
@@ -40,10 +41,6 @@ public class PrimitiveArrayManager {
   private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
       new EnumMap<>(TSDataType.class);
 
-  /** data type -> current number of buffered arrays */
-  private static final Map<TSDataType, Integer> bufferedArraysNumMap =
-      new EnumMap<>(TSDataType.class);
-
   /** data type -> ratio of data type in schema, which could be seen as recommended ratio */
   private static final Map<TSDataType, Double> bufferedArraysNumRatio =
       new EnumMap<>(TSDataType.class);
@@ -59,10 +56,10 @@ public class PrimitiveArrayManager {
       config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
 
   /** total size of buffered arrays */
-  private static AtomicLong bufferedArraysRamSize = new AtomicLong();
+  private static final AtomicLong bufferedArraysRamSize = new AtomicLong();
 
   /** total size of out of buffer arrays */
-  private static AtomicLong outOfBufferArraysRamSize = new AtomicLong();
+  private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong();
 
   static {
     bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
@@ -84,11 +81,12 @@ public class PrimitiveArrayManager {
    * @return an array
    */
   public static Object getPrimitiveArraysByType(TSDataType dataType) {
+    long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize();
+
     // check memory of buffered array, if already full, generate OOB
-    if (bufferedArraysRamSize.get() + ARRAY_SIZE * dataType.getDataTypeSize()
-        > BUFFERED_ARRAY_SIZE_THRESHOLD) {
+    if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) {
       // return an out of buffer array
-      outOfBufferArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
+      outOfBufferArraysRamSize.addAndGet(delta);
       return createPrimitiveArray(dataType);
     }
 
@@ -98,11 +96,10 @@ public class PrimitiveArrayManager {
       if (dataArray != null) {
         return dataArray;
       }
-      // no buffered array, create one
-      bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
-      bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
     }
 
+    // no buffered array, create one
+    bufferedArraysRamSize.addAndGet(delta);
     return createPrimitiveArray(dataType);
   }
 
@@ -188,59 +185,54 @@ public class PrimitiveArrayManager {
   /**
    * This method is called when bringing back data array
    *
-   * @param dataArray data array
+   * @param releasingArray data array to be released
    */
-  public static void release(Object dataArray) {
-    TSDataType dataType;
-    if (dataArray instanceof boolean[]) {
-      dataType = TSDataType.BOOLEAN;
-    } else if (dataArray instanceof int[]) {
-      dataType = TSDataType.INT32;
-    } else if (dataArray instanceof long[]) {
-      dataType = TSDataType.INT64;
-    } else if (dataArray instanceof float[]) {
-      dataType = TSDataType.FLOAT;
-    } else if (dataArray instanceof double[]) {
-      dataType = TSDataType.DOUBLE;
-    } else if (dataArray instanceof Binary[]) {
-      Arrays.fill((Binary[]) dataArray, null);
-      dataType = TSDataType.TEXT;
+  public static void release(Object releasingArray) {
+    TSDataType releasingType;
+    if (releasingArray instanceof boolean[]) {
+      releasingType = TSDataType.BOOLEAN;
+    } else if (releasingArray instanceof int[]) {
+      releasingType = TSDataType.INT32;
+    } else if (releasingArray instanceof long[]) {
+      releasingType = TSDataType.INT64;
+    } else if (releasingArray instanceof float[]) {
+      releasingType = TSDataType.FLOAT;
+    } else if (releasingArray instanceof double[]) {
+      releasingType = TSDataType.DOUBLE;
+    } else if (releasingArray instanceof Binary[]) {
+      Arrays.fill((Binary[]) releasingArray, null);
+      releasingType = TSDataType.TEXT;
     } else {
       throw new UnSupportedDataTypeException("Unknown data array type");
     }
 
-    // Check out of buffer array num
-    if (outOfBufferArraysRamSize.get() > 0 && isCurrentDataTypeExceeded(dataType)) {
-      // release an out of buffer array
-      bringBackOOBArray(dataType, ARRAY_SIZE);
-    } else if (outOfBufferArraysRamSize.get() > 0 && !isCurrentDataTypeExceeded(dataType)) {
-      // if the ratio of buffered arrays of this data type does not exceed the schema ratio,
-      // choose one replaced array who has larger ratio than schema recommended ratio
-      TSDataType replacedDataType = null;
-      for (Map.Entry<TSDataType, Integer> entry : bufferedArraysNumMap.entrySet()) {
-        if (isCurrentDataTypeExceeded(entry.getKey())) {
-          replacedDataType = entry.getKey();
-          // bring back the replaced array as OOB array
-          bringBackOOBArray(replacedDataType, ARRAY_SIZE);
-          break;
-        }
-      }
-      if (replacedDataType != null) {
-        // if we find a replaced array, bring back the original array as a buffered array
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "The ratio of {} in buffered array has not reached the schema ratio. Replaced by {}",
-              dataType,
-              replacedDataType);
+    if (outOfBufferArraysRamSize.get() <= 0) {
+      // if there is no out of buffer array, bring back as buffered array directly
+      putBackBufferedArray(releasingType, releasingArray);
+    } else {
+      // if the system has out of buffer array, we need to release some memory
+      if (!isCurrentDataTypeExceeded(releasingType)) {
+        // if the buffered array of the releasingType is less than expected
+        // choose an array of redundantDataType to release and try to buffer the array of
+        // releasingType
+        for (Entry<TSDataType, ArrayDeque<Object>> entry : bufferedArraysMap.entrySet()) {
+          TSDataType dataType = entry.getKey();
+          if (isCurrentDataTypeExceeded(dataType)) {
+            // if we find a replaced array, bring back the original array as a buffered array
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  "The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}",
+                  releasingType,
+                  dataType);
+            }
+            // bring back the replaced array as OOB array
+            replaceBufferedArray(releasingType, releasingArray, dataType);
+            break;
+          }
         }
-        bringBackBufferedArray(dataType, dataArray);
-      } else {
-        // or else bring back the original array as OOB array
-        bringBackOOBArray(dataType, ARRAY_SIZE);
       }
-    } else {
-      // if there is no out of buffer array, bring back as buffered array directly
-      bringBackBufferedArray(dataType, dataArray);
+
+      releaseOutOfBuffer(releasingType);
     }
   }
 
@@ -250,34 +242,47 @@ public class PrimitiveArrayManager {
    * @param dataType data type
    * @param dataArray data array
    */
-  private static void bringBackBufferedArray(TSDataType dataType, Object dataArray) {
+  private static void putBackBufferedArray(TSDataType dataType, Object dataArray) {
     synchronized (bufferedArraysMap.get(dataType)) {
       bufferedArraysMap.get(dataType).add(dataArray);
-      bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
     }
-    bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * dataType.getDataTypeSize());
   }
 
-  /**
-   * Bring back out of buffered array
-   *
-   * @param dataType data type
-   * @param size capacity
-   */
-  private static void bringBackOOBArray(TSDataType dataType, int size) {
-    outOfBufferArraysRamSize.addAndGet((long) -size * dataType.getDataTypeSize());
+  private static void replaceBufferedArray(
+      TSDataType releasingType, Object releasingArray, TSDataType redundantType) {
+    synchronized (bufferedArraysMap.get(redundantType)) {
+      if (bufferedArraysMap.get(redundantType).poll() != null) {
+        bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize());
+      }
+    }
+
+    if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize()
+        < BUFFERED_ARRAY_SIZE_THRESHOLD) {
+      ArrayDeque<Object> releasingArrays = bufferedArraysMap.get(releasingType);
+      synchronized (releasingArrays) {
+        releasingArrays.add(releasingArray);
+      }
+      bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize());
+    }
+  }
+
+  private static void releaseOutOfBuffer(TSDataType dataType) {
+    outOfBufferArraysRamSize.getAndUpdate(
+        l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize()));
   }
 
   /**
    * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
    *     specific type)
-   * @param total current DataType Total Num (twice of number of time series)
+   * @param totalSeries total time series number
    */
   public static void updateSchemaDataTypeNum(
-      Map<TSDataType, Integer> schemaDataTypeNumMap, long total) {
+      Map<TSDataType, Integer> schemaDataTypeNumMap, long totalSeries) {
     for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
       TSDataType dataType = entry.getKey();
-      bufferedArraysNumRatio.put(dataType, (double) schemaDataTypeNumMap.get(dataType) / total);
+      // one time series has 2 columns (time column + value column)
+      bufferedArraysNumRatio.put(
+          dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2));
     }
   }
 
@@ -289,13 +294,14 @@ public class PrimitiveArrayManager {
    * @return true if the buffered array ratio exceeds the recommend ratio
    */
   private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
-    int total = 0;
-    for (int num : bufferedArraysNumMap.values()) {
-      total += num;
+    long total = 0;
+    for (ArrayDeque<Object> value : bufferedArraysMap.values()) {
+      total += value.size();
     }
+    long arrayNumInBuffer =
+        bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0;
     return total != 0
-        && ((double) bufferedArraysNumMap.getOrDefault(dataType, 0) / total
-            > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
+        && ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
   }
 
   public static void close() {
@@ -303,7 +309,6 @@ public class PrimitiveArrayManager {
       dataListQueue.clear();
     }
 
-    bufferedArraysNumMap.clear();
     bufferedArraysNumRatio.clear();
 
     bufferedArraysRamSize.set(0);