You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/07/06 02:44:02 UTC

[iotdb] branch rel/0.12 updated: Primitive Array Manager v3 (#3508)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 8eee30e  Primitive Array Manager v3 (#3508)
8eee30e is described below

commit 8eee30ecb11f57cf5f8df1ca514b869c8ad34371
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Mon Jul 5 21:43:41 2021 -0500

    Primitive Array Manager v3 (#3508)
---
 .../org/apache/iotdb/db/metadata/MManager.java     |  23 --
 .../iotdb/db/rescon/PrimitiveArrayManager.java     | 343 ++++++++++-----------
 .../iotdb/db/utils/datastructure/TVList.java       |   2 +-
 3 files changed, 163 insertions(+), 205 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d34088c..43b4593 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -57,7 +57,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
 import org.apache.iotdb.db.rescon.MemTableManager;
-import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.RandomDeleteCache;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -421,7 +420,6 @@ public class MManager {
         logger.warn("Current series number {} is too large...", totalSeriesNumber);
         allowToCreateNewSeries = false;
       }
-      updateSchemaDataTypeNumMap(type, 1);
 
       // write log
       if (!isRecovering) {
@@ -569,9 +567,6 @@ public class MManager {
     removeFromTagInvertedIndex(pair.right);
     PartialPath storageGroupPath = pair.left;
 
-    // update statistics in schemaDataTypeNumMap
-    updateSchemaDataTypeNumMap(pair.right.getSchema().getType(), -1);
-
     // TODO: delete the path node and all its ancestors
     mNodeCache.clear();
     totalSeriesNumber.addAndGet(-1);
@@ -623,8 +618,6 @@ public class MManager {
         List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
         for (MeasurementMNode leafMNode : leafMNodes) {
           removeFromTagInvertedIndex(leafMNode);
-          // update statistics in schemaDataTypeNumMap
-          updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
         }
 
         if (!config.isEnableMemControl()) {
@@ -642,22 +635,6 @@ public class MManager {
   }
 
   /**
-   * update statistics in schemaDataTypeNumMap
-   *
-   * @param type data type
-   * @param num 1 for creating timeseries and -1 for deleting timeseries
-   */
-  private synchronized void updateSchemaDataTypeNumMap(TSDataType type, int num) {
-    // add an array of the series type
-    schemaDataTypeNumMap.put(type, schemaDataTypeNumMap.getOrDefault(type, 0) + num);
-    // add an array of time
-    schemaDataTypeNumMap.put(
-        TSDataType.INT64, schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
-
-    PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get());
-  }
-
-  /**
    * Check if the given path is storage group or not.
    *
    * @param path Format: root.node.(node)*
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index e1b3fdd..e08773a 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -29,78 +29,156 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 
-/** Manage all primitive data list in memory, including get and release operation. */
+/** Manage all primitive data lists in memory, including get and release operations. */
 public class PrimitiveArrayManager {
 
-  /** data type -> ArrayDeque<Array> */
-  private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
-      new EnumMap<>(TSDataType.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveArrayManager.class);
 
-  /** data type -> ratio of data type in schema, which could be seen as recommended ratio */
-  private static final Map<TSDataType, Double> bufferedArraysNumRatio =
-      new EnumMap<>(TSDataType.class);
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
-  private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class);
+  public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize();
 
-  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  /** threshold total size of arrays for all data types */
+  private static final double POOLED_ARRAYS_MEMORY_THRESHOLD =
+      CONFIG.getAllocateMemoryForWrite() * CONFIG.getBufferedArraysMemoryProportion();
 
-  public static final int ARRAY_SIZE = config.getPrimitiveArraySize();
+  /** TSDataType#serialize() -> ArrayDeque<Array> */
+  private static final ArrayDeque[] POOLED_ARRAYS = new ArrayDeque[TSDataType.values().length];
 
-  /** threshold total size of arrays for all data types */
-  private static final double BUFFERED_ARRAY_SIZE_THRESHOLD =
-      config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
+  /** TSDataType#serialize() -> max size of ArrayDeque<Array> */
+  private static final int[] LIMITS = new int[TSDataType.values().length];
+
+  /** LIMITS should be updated if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) */
+  private static long limitUpdateThreshold;
 
-  /** total size of buffered arrays */
-  private static final AtomicLong bufferedArraysRamSize = new AtomicLong();
+  /** TSDataType#serialize() -> count of allocation requests */
+  private static final AtomicLong[] ALLOCATION_REQUEST_COUNTS =
+      new AtomicLong[] {
+        new AtomicLong(0),
+        new AtomicLong(0),
+        new AtomicLong(0),
+        new AtomicLong(0),
+        new AtomicLong(0),
+        new AtomicLong(0)
+      };
 
-  /** total size of out of buffer arrays */
-  private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong();
+  private static final AtomicLong TOTAL_ALLOCATION_REQUEST_COUNT = new AtomicLong(0);
 
   static {
-    bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
-    bufferedArraysMap.put(TSDataType.INT32, new ArrayDeque<>());
-    bufferedArraysMap.put(TSDataType.INT64, new ArrayDeque<>());
-    bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
-    bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
-    bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
+    init();
   }
 
-  private PrimitiveArrayManager() {
-    logger.info("BufferedArraySizeThreshold is {}", BUFFERED_ARRAY_SIZE_THRESHOLD);
+  private static void init() {
+    LOGGER.info("BufferedArraySizeThreshold is {}", POOLED_ARRAYS_MEMORY_THRESHOLD);
+
+    // POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * ARRAY_SIZE * LIMITS[i])
+    // we init all LIMITS[i] with the same value, so we have
+    // => LIMITS[i] = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / ∑(datatype[i].getDataTypeSize())
+    int totalDataTypeSize = 0;
+    for (TSDataType dataType : TSDataType.values()) {
+      totalDataTypeSize += dataType.getDataTypeSize();
+    }
+    @SuppressWarnings("squid:S3518") // totalDataTypeSize can not be zero
+    double limit = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / totalDataTypeSize;
+    Arrays.fill(LIMITS, (int) limit);
+
+    // limitUpdateThreshold = ∑(LIMITS[i])
+    limitUpdateThreshold = (long) (TSDataType.values().length * limit);
+
+    for (int i = 0; i < POOLED_ARRAYS.length; ++i) {
+      POOLED_ARRAYS[i] = new ArrayDeque<>((int) limit);
+    }
+
+    for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) {
+      allocationRequestCount.set(0);
+    }
+
+    TOTAL_ALLOCATION_REQUEST_COUNT.set(0);
   }
 
+  private PrimitiveArrayManager() {}
+
   /**
-   * Get primitive data lists according to type
+   * Get or allocate primitive data lists according to type
    *
-   * @param dataType data type
    * @return an array
    */
-  public static Object getPrimitiveArraysByType(TSDataType dataType) {
-    long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize();
-
-    // check memory of buffered array, if already full, generate OOB
-    if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) {
-      // return an out of buffer array
-      outOfBufferArraysRamSize.addAndGet(delta);
-      return createPrimitiveArray(dataType);
+  public static Object allocate(TSDataType dataType) {
+    if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) {
+      synchronized (TOTAL_ALLOCATION_REQUEST_COUNT) {
+        if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) {
+          updateLimits();
+        }
+      }
+    }
+
+    int order = dataType.serialize();
+
+    ALLOCATION_REQUEST_COUNTS[order].incrementAndGet();
+    TOTAL_ALLOCATION_REQUEST_COUNT.incrementAndGet();
+
+    Object array;
+    synchronized (POOLED_ARRAYS[order]) {
+      array = POOLED_ARRAYS[order].poll();
+    }
+    if (array == null) {
+      array = createPrimitiveArray(dataType);
+    }
+    return array;
+  }
+
+  private static void updateLimits() {
+    // we want to update LIMITS[i] according to ratios[i]
+    double[] ratios = new double[ALLOCATION_REQUEST_COUNTS.length];
+    for (int i = 0; i < ALLOCATION_REQUEST_COUNTS.length; ++i) {
+      ratios[i] =
+          ALLOCATION_REQUEST_COUNTS[i].get() / (double) TOTAL_ALLOCATION_REQUEST_COUNT.get();
     }
 
-    synchronized (bufferedArraysMap.get(dataType)) {
-      // try to get a buffered array
-      Object dataArray = bufferedArraysMap.get(dataType).poll();
-      if (dataArray != null) {
-        return dataArray;
+    // initially we have:
+    //   POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * LIMITS[i]) * ARRAY_SIZE
+    // we can find a number called limitBase which satisfies:
+    //   LIMITS[i] = limitBase * ratios[i]
+
+    // => POOLED_ARRAYS_MEMORY_THRESHOLD =
+    //     limitBase * ∑(datatype[i].getDataTypeSize() * ratios[i]) * ARRAY_SIZE
+    // => limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE
+    //     / ∑(datatype[i].getDataTypeSize() * ratios[i])
+    long weightedSumOfRatios = 0;
+    for (TSDataType dataType : TSDataType.values()) {
+      weightedSumOfRatios += dataType.getDataTypeSize() * ratios[dataType.serialize()];
+    }
+    @SuppressWarnings("squid:S3518") // weightedSumOfRatios can not be zero
+    double limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / weightedSumOfRatios;
+
+    // LIMITS[i] = limitBase * ratios[i]
+    for (int i = 0; i < LIMITS.length; ++i) {
+      int oldLimit = LIMITS[i];
+      int newLimit = (int) (limitBase * ratios[i]);
+      LIMITS[i] = newLimit;
+
+      if (LOGGER.isInfoEnabled()) {
+        LOGGER.info(
+            "limit of {} array deque size updated: {} -> {}",
+            TSDataType.deserialize((byte) i).name(),
+            oldLimit,
+            newLimit);
       }
     }
 
-    // no buffered array, create one
-    bufferedArraysRamSize.addAndGet(delta);
-    return createPrimitiveArray(dataType);
+    // limitUpdateThreshold = ∑(LIMITS[i])
+    limitUpdateThreshold = 0;
+    for (int limit : LIMITS) {
+      limitUpdateThreshold += limit;
+    }
+
+    for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) {
+      allocationRequestCount.set(0);
+    }
+
+    TOTAL_ALLOCATION_REQUEST_COUNT.set(0);
   }
 
   private static Object createPrimitiveArray(TSDataType dataType) {
@@ -125,13 +203,49 @@ public class PrimitiveArrayManager {
         dataArray = new Binary[ARRAY_SIZE];
         break;
       default:
-        throw new UnSupportedDataTypeException(dataType.toString());
+        throw new UnSupportedDataTypeException(dataType.name());
     }
 
     return dataArray;
   }
 
   /**
+   * This method is called when bringing back data array
+   *
+   * @param array data array to be released
+   */
+  public static void release(Object array) {
+    int order;
+    if (array instanceof boolean[]) {
+      order = TSDataType.BOOLEAN.serialize();
+    } else if (array instanceof int[]) {
+      order = TSDataType.INT32.serialize();
+    } else if (array instanceof long[]) {
+      order = TSDataType.INT64.serialize();
+    } else if (array instanceof float[]) {
+      order = TSDataType.FLOAT.serialize();
+    } else if (array instanceof double[]) {
+      order = TSDataType.DOUBLE.serialize();
+    } else if (array instanceof Binary[]) {
+      Arrays.fill((Binary[]) array, null);
+      order = TSDataType.TEXT.serialize();
+    } else {
+      throw new UnSupportedDataTypeException(array.getClass().toString());
+    }
+
+    synchronized (POOLED_ARRAYS[order]) {
+      ArrayDeque<Object> arrays = POOLED_ARRAYS[order];
+      if (arrays.size() < LIMITS[order]) {
+        arrays.add(array);
+      }
+    }
+  }
+
+  public static void close() {
+    init();
+  }
+
+  /**
    * Get primitive data lists according to data type and size, only for TVList's sorting
    *
    * @param dataType data type
@@ -178,140 +292,7 @@ public class PrimitiveArrayManager {
         }
         return binaries;
       default:
-        return null;
-    }
-  }
-
-  /**
-   * This method is called when bringing back data array
-   *
-   * @param releasingArray data array to be released
-   */
-  public static void release(Object releasingArray) {
-    TSDataType releasingType;
-    if (releasingArray instanceof boolean[]) {
-      releasingType = TSDataType.BOOLEAN;
-    } else if (releasingArray instanceof int[]) {
-      releasingType = TSDataType.INT32;
-    } else if (releasingArray instanceof long[]) {
-      releasingType = TSDataType.INT64;
-    } else if (releasingArray instanceof float[]) {
-      releasingType = TSDataType.FLOAT;
-    } else if (releasingArray instanceof double[]) {
-      releasingType = TSDataType.DOUBLE;
-    } else if (releasingArray instanceof Binary[]) {
-      Arrays.fill((Binary[]) releasingArray, null);
-      releasingType = TSDataType.TEXT;
-    } else {
-      throw new UnSupportedDataTypeException("Unknown data array type");
-    }
-
-    if (outOfBufferArraysRamSize.get() <= 0) {
-      // if there is no out of buffer array, bring back as buffered array directly
-      putBackBufferedArray(releasingType, releasingArray);
-    } else {
-      // if the system has out of buffer array, we need to release some memory
-      if (!isCurrentDataTypeExceeded(releasingType)) {
-        // if the buffered array of the releasingType is less than expected
-        // choose an array of redundantDataType to release and try to buffer the array of
-        // releasingType
-        for (Entry<TSDataType, ArrayDeque<Object>> entry : bufferedArraysMap.entrySet()) {
-          TSDataType dataType = entry.getKey();
-          if (isCurrentDataTypeExceeded(dataType)) {
-            // if we find a replaced array, bring back the original array as a buffered array
-            if (logger.isDebugEnabled()) {
-              logger.debug(
-                  "The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}",
-                  releasingType,
-                  dataType);
-            }
-            // bring back the replaced array as OOB array
-            replaceBufferedArray(releasingType, releasingArray, dataType);
-            break;
-          }
-        }
-      }
-
-      releaseOutOfBuffer(releasingType);
+        throw new UnSupportedDataTypeException(dataType.name());
     }
   }
-
-  /**
-   * Bring back a buffered array
-   *
-   * @param dataType data type
-   * @param dataArray data array
-   */
-  private static void putBackBufferedArray(TSDataType dataType, Object dataArray) {
-    synchronized (bufferedArraysMap.get(dataType)) {
-      bufferedArraysMap.get(dataType).add(dataArray);
-    }
-  }
-
-  private static void replaceBufferedArray(
-      TSDataType releasingType, Object releasingArray, TSDataType redundantType) {
-    synchronized (bufferedArraysMap.get(redundantType)) {
-      if (bufferedArraysMap.get(redundantType).poll() != null) {
-        bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize());
-      }
-    }
-
-    if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize()
-        < BUFFERED_ARRAY_SIZE_THRESHOLD) {
-      ArrayDeque<Object> releasingArrays = bufferedArraysMap.get(releasingType);
-      synchronized (releasingArrays) {
-        releasingArrays.add(releasingArray);
-      }
-      bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize());
-    }
-  }
-
-  private static void releaseOutOfBuffer(TSDataType dataType) {
-    outOfBufferArraysRamSize.getAndUpdate(
-        l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize()));
-  }
-
-  /**
-   * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
-   *     specific type)
-   * @param totalSeries total time series number
-   */
-  public static void updateSchemaDataTypeNum(
-      Map<TSDataType, Integer> schemaDataTypeNumMap, long totalSeries) {
-    for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
-      TSDataType dataType = entry.getKey();
-      // one time series has 2 columns (time column + value column)
-      bufferedArraysNumRatio.put(
-          dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2));
-    }
-  }
-
-  /**
-   * check whether the ratio of buffered array of specific data type reaches the ratio in schema (as
-   * recommended ratio)
-   *
-   * @param dataType data type
-   * @return true if the buffered array ratio exceeds the recommend ratio
-   */
-  private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
-    long total = 0;
-    for (ArrayDeque<Object> value : bufferedArraysMap.values()) {
-      total += value.size();
-    }
-    long arrayNumInBuffer =
-        bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0;
-    return total != 0
-        && ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
-  }
-
-  public static void close() {
-    for (ArrayDeque<Object> dataListQueue : bufferedArraysMap.values()) {
-      dataListQueue.clear();
-    }
-
-    bufferedArraysNumRatio.clear();
-
-    bufferedArraysRamSize.set(0);
-    outOfBufferArraysRamSize.set(0);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..8ce6295 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -294,7 +294,7 @@ public abstract class TVList {
   }
 
   protected Object getPrimitiveArraysByType(TSDataType dataType) {
-    return PrimitiveArrayManager.getPrimitiveArraysByType(dataType);
+    return PrimitiveArrayManager.allocate(dataType);
   }
 
   protected long[] cloneTime(long[] array) {