You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/07/03 08:54:35 UTC

[iotdb] branch arraysize11 created (now d162c26)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch arraysize11
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at d162c26  [To rel/0.11] Primitive Array Manager v2

This branch includes the following new commits:

     new d162c26  [To rel/0.11] Primitive Array Manager v2

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: [To rel/0.11] Primitive Array Manager v2

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch arraysize11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d162c2669cd822902f9c274152d005f459009909
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Jul 3 16:48:49 2021 +0800

    [To rel/0.11] Primitive Array Manager v2
---
 .../org/apache/iotdb/db/metadata/MManager.java     |  16 +-
 .../iotdb/db/rescon/PrimitiveArrayManager.java     | 219 ++++++++++-----------
 2 files changed, 108 insertions(+), 127 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 325a2e4..68f78fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -103,8 +103,6 @@ public class MManager {
   private static final String DEBUG_MSG_1 = "%s: TimeSeries %s's tag info has been removed from tag inverted index ";
   private static final String PREVIOUS_CONDITION = "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
 
-  private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;
-
   private static final Logger logger = LoggerFactory.getLogger(MManager.class);
 
   /**
@@ -128,8 +126,6 @@ public class MManager {
 
   // data type -> number
   private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
-  // reported total series number
-  private long reportedDataTypeTotalNum;
   private AtomicLong totalSeriesNumber = new AtomicLong();
   private boolean initialized;
   protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -224,7 +220,6 @@ public class MManager {
       mtree = new MTree();
       logger.error("Cannot read MTree from file, using an empty new one", e);
     }
-    reportedDataTypeTotalNum = 0L;
     initialized = true;
   }
 
@@ -293,7 +288,6 @@ public class MManager {
         tagLogFile = null;
       }
       this.schemaDataTypeNumMap.clear();
-      this.reportedDataTypeTotalNum = 0L;
       initialized = false;
       if (config.isEnableMTreeSnapshot() && timedCreateMTreeSnapshotThread != null) {
         timedCreateMTreeSnapshotThread.shutdownNow();
@@ -646,15 +640,7 @@ public class MManager {
     schemaDataTypeNumMap.put(TSDataType.INT64,
         schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
 
-    // total current DataType Total Num (twice of number of time series)
-    // used in primitive array manager
-    long currentDataTypeTotalNum = totalSeriesNumber.get() * 2;
-
-    if (num > 0 && currentDataTypeTotalNum - reportedDataTypeTotalNum
-        >= UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD) {
-      PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, currentDataTypeTotalNum);
-      reportedDataTypeTotalNum = currentDataTypeTotalNum;
-    }
+    PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get());
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index 20063fd..78e849e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -18,41 +18,32 @@
  */
 package org.apache.iotdb.db.rescon;
 
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Manage all primitive data list in memory, including get and release operation.
- */
-public class PrimitiveArrayManager {
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
 
-  /**
-   * data type -> ArrayDeque<Array>
-   */
-  private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap = new EnumMap<>(
-      TSDataType.class);
+/** Manage all primitive data list in memory, including get and release operation. */
+public class PrimitiveArrayManager {
 
-  /**
-   * data type -> current number of buffered arrays
-   */
-  private static final Map<TSDataType, Integer> bufferedArraysNumMap = new EnumMap<>(
-      TSDataType.class);
+  /** data type -> ArrayDeque<Array> */
+  private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
+      new EnumMap<>(TSDataType.class);
 
-  /**
-   * data type -> ratio of data type in schema, which could be seen as recommended ratio
-   */
-  private static final Map<TSDataType, Double> bufferedArraysNumRatio = new EnumMap<>(
-      TSDataType.class);
+  /** data type -> ratio of data type in schema, which could be seen as recommended ratio */
+  private static final Map<TSDataType, Double> bufferedArraysNumRatio =
+      new EnumMap<>(TSDataType.class);
 
   private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class);
 
@@ -60,21 +51,15 @@ public class PrimitiveArrayManager {
 
   public static final int ARRAY_SIZE = config.getPrimitiveArraySize();
 
-  /**
-   * threshold total size of arrays for all data types
-   */
+  /** threshold total size of arrays for all data types */
   private static final double BUFFERED_ARRAY_SIZE_THRESHOLD =
       config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
 
-  /**
-   * total size of buffered arrays
-   */
-  private static AtomicLong bufferedArraysRamSize = new AtomicLong();
+  /** total size of buffered arrays */
+  private static final AtomicLong bufferedArraysRamSize = new AtomicLong();
 
-  /**
-   * total size of out of buffer arrays
-   */
-  private static AtomicLong outOfBufferArraysRamSize = new AtomicLong();
+  /** total size of out of buffer arrays */
+  private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong();
 
   static {
     bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
@@ -96,11 +81,12 @@ public class PrimitiveArrayManager {
    * @return an array
    */
   public static Object getPrimitiveArraysByType(TSDataType dataType) {
+    long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize();
+
     // check memory of buffered array, if already full, generate OOB
-    if (bufferedArraysRamSize.get() + ARRAY_SIZE * dataType.getDataTypeSize()
-        > BUFFERED_ARRAY_SIZE_THRESHOLD) {
+    if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) {
       // return an out of buffer array
-      outOfBufferArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
+      outOfBufferArraysRamSize.addAndGet(delta);
       return createPrimitiveArray(dataType);
     }
 
@@ -110,11 +96,10 @@ public class PrimitiveArrayManager {
       if (dataArray != null) {
         return dataArray;
       }
-      // no buffered array, create one
-      bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
-      bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
     }
 
+    // no buffered array, create one
+    bufferedArraysRamSize.addAndGet(delta);
     return createPrimitiveArray(dataType);
   }
 
@@ -150,10 +135,10 @@ public class PrimitiveArrayManager {
    * Get primitive data lists according to data type and size, only for TVList's sorting
    *
    * @param dataType data type
-   * @param size     needed capacity
+   * @param size needed capacity
    * @return an array of primitive data arrays
    */
-  public static synchronized Object createDataListsByType(TSDataType dataType, int size) {
+  public static Object createDataListsByType(TSDataType dataType, int size) {
     int arrayNumber = (int) Math.ceil((float) size / (float) ARRAY_SIZE);
     switch (dataType) {
       case BOOLEAN:
@@ -200,94 +185,104 @@ public class PrimitiveArrayManager {
   /**
    * This method is called when bringing back data array
    *
-   * @param dataArray data array
+   * @param releasingArray data array to be released
    */
-  public static void release(Object dataArray) {
-    TSDataType dataType;
-    if (dataArray instanceof boolean[]) {
-      dataType = TSDataType.BOOLEAN;
-    } else if (dataArray instanceof int[]) {
-      dataType = TSDataType.INT32;
-    } else if (dataArray instanceof long[]) {
-      dataType = TSDataType.INT64;
-    } else if (dataArray instanceof float[]) {
-      dataType = TSDataType.FLOAT;
-    } else if (dataArray instanceof double[]) {
-      dataType = TSDataType.DOUBLE;
-    } else if (dataArray instanceof Binary[]) {
-      Arrays.fill((Binary[]) dataArray, null);
-      dataType = TSDataType.TEXT;
+  public static void release(Object releasingArray) {
+    TSDataType releasingType;
+    if (releasingArray instanceof boolean[]) {
+      releasingType = TSDataType.BOOLEAN;
+    } else if (releasingArray instanceof int[]) {
+      releasingType = TSDataType.INT32;
+    } else if (releasingArray instanceof long[]) {
+      releasingType = TSDataType.INT64;
+    } else if (releasingArray instanceof float[]) {
+      releasingType = TSDataType.FLOAT;
+    } else if (releasingArray instanceof double[]) {
+      releasingType = TSDataType.DOUBLE;
+    } else if (releasingArray instanceof Binary[]) {
+      Arrays.fill((Binary[]) releasingArray, null);
+      releasingType = TSDataType.TEXT;
     } else {
       throw new UnSupportedDataTypeException("Unknown data array type");
     }
 
-    // Check out of buffer array num
-    if (outOfBufferArraysRamSize.get() > 0 && isCurrentDataTypeExceeded(dataType)) {
-      // release an out of buffer array
-      bringBackOOBArray(dataType, ARRAY_SIZE);
-    } else if (outOfBufferArraysRamSize.get() > 0 && !isCurrentDataTypeExceeded(dataType)) {
-      // if the ratio of buffered arrays of this data type does not exceed the schema ratio,
-      // choose one replaced array who has larger ratio than schema recommended ratio
-      TSDataType replacedDataType = null;
-      for (Map.Entry<TSDataType, Integer> entry : bufferedArraysNumMap.entrySet()) {
-        if (isCurrentDataTypeExceeded(entry.getKey())) {
-          replacedDataType = entry.getKey();
-          // bring back the replaced array as OOB array
-          bringBackOOBArray(replacedDataType, ARRAY_SIZE);
-          break;
-        }
-      }
-      if (replacedDataType != null) {
-        // if we find a replaced array, bring back the original array as a buffered array
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "The ratio of {} in buffered array has not reached the schema ratio. Replaced by {}",
-              dataType, replacedDataType);
+    if (outOfBufferArraysRamSize.get() <= 0) {
+      // if there is no out of buffer array, bring back as buffered array directly
+      putBackBufferedArray(releasingType, releasingArray);
+    } else {
+      // if the system has out of buffer array, we need to release some memory
+      if (!isCurrentDataTypeExceeded(releasingType)) {
+        // if the buffered array of the releasingType is less than expected
+        // choose an array of redundantDataType to release and try to buffer the array of
+        // releasingType
+        for (Entry<TSDataType, ArrayDeque<Object>> entry : bufferedArraysMap.entrySet()) {
+          TSDataType dataType = entry.getKey();
+          if (isCurrentDataTypeExceeded(dataType)) {
+            // if we find a replaced array, bring back the original array as a buffered array
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  "The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}",
+                  releasingType,
+                  dataType);
+            }
+            // bring back the replaced array as OOB array
+            replaceBufferedArray(releasingType, releasingArray, dataType);
+            break;
+          }
         }
-        bringBackBufferedArray(dataType, dataArray);
-      } else {
-        // or else bring back the original array as OOB array
-        bringBackOOBArray(dataType, ARRAY_SIZE);
       }
-    } else {
-      // if there is no out of buffer array, bring back as buffered array directly
-      bringBackBufferedArray(dataType, dataArray);
+
+      releaseOutOfBuffer(releasingType);
     }
   }
 
   /**
    * Bring back a buffered array
    *
-   * @param dataType  data type
+   * @param dataType data type
    * @param dataArray data array
    */
-  private static void bringBackBufferedArray(TSDataType dataType, Object dataArray) {
+  private static void putBackBufferedArray(TSDataType dataType, Object dataArray) {
     synchronized (bufferedArraysMap.get(dataType)) {
       bufferedArraysMap.get(dataType).add(dataArray);
-      bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
     }
-    bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * dataType.getDataTypeSize());
   }
 
-  /**
-   * Bring back out of buffered array
-   *
-   * @param dataType data type
-   * @param size     capacity
-   */
-  private static void bringBackOOBArray(TSDataType dataType, int size) {
-    outOfBufferArraysRamSize.addAndGet((long) -size * dataType.getDataTypeSize());
+  private static void replaceBufferedArray(
+      TSDataType releasingType, Object releasingArray, TSDataType redundantType) {
+    synchronized (bufferedArraysMap.get(redundantType)) {
+      if (bufferedArraysMap.get(redundantType).poll() != null) {
+        bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize());
+      }
+    }
+
+    if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize()
+        < BUFFERED_ARRAY_SIZE_THRESHOLD) {
+      ArrayDeque<Object> releasingArrays = bufferedArraysMap.get(releasingType);
+      synchronized (releasingArrays) {
+        releasingArrays.add(releasingArray);
+      }
+      bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize());
+    }
+  }
+
+  private static void releaseOutOfBuffer(TSDataType dataType) {
+    outOfBufferArraysRamSize.getAndUpdate(
+        l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize()));
   }
 
   /**
-   * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a specific type)
-   * @param total current DataType Total Num (twice of number of time series)
+   * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
+   *     specific type)
+   * @param totalSeries total time series number
    */
-  public static void updateSchemaDataTypeNum(Map<TSDataType, Integer> schemaDataTypeNumMap,
-      long total) {
+  public static void updateSchemaDataTypeNum(
+      Map<TSDataType, Integer> schemaDataTypeNumMap, long totalSeries) {
     for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
       TSDataType dataType = entry.getKey();
-      bufferedArraysNumRatio.put(dataType, (double) schemaDataTypeNumMap.get(dataType) / total);
+      // one time series has 2 columns (time column + value column)
+      bufferedArraysNumRatio.put(
+          dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2));
     }
   }
 
@@ -299,13 +294,14 @@ public class PrimitiveArrayManager {
    * @return true if the buffered array ratio exceeds the recommend ratio
    */
   private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
-    int total = 0;
-    for (int num : bufferedArraysNumMap.values()) {
-      total += num;
+    long total = 0;
+    for (ArrayDeque<Object> value : bufferedArraysMap.values()) {
+      total += value.size();
     }
-    return total != 0 &&
-        ((double) bufferedArraysNumMap.getOrDefault(dataType, 0) / total >
-            bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
+    long arrayNumInBuffer =
+        bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0;
+    return total != 0
+        && ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
   }
 
   public static void close() {
@@ -313,7 +309,6 @@ public class PrimitiveArrayManager {
       dataListQueue.clear();
     }
 
-    bufferedArraysNumMap.clear();
     bufferedArraysNumRatio.clear();
 
     bufferedArraysRamSize.set(0);