You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/07/06 02:44:02 UTC
[iotdb] branch rel/0.12 updated: Primitive Array Manager v3 (#3508)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 8eee30e Primitive Array Manager v3 (#3508)
8eee30e is described below
commit 8eee30ecb11f57cf5f8df1ca514b869c8ad34371
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Mon Jul 5 21:43:41 2021 -0500
Primitive Array Manager v3 (#3508)
---
.../org/apache/iotdb/db/metadata/MManager.java | 23 --
.../iotdb/db/rescon/PrimitiveArrayManager.java | 343 ++++++++++-----------
.../iotdb/db/utils/datastructure/TVList.java | 2 +-
3 files changed, 163 insertions(+), 205 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d34088c..43b4593 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -57,7 +57,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.rescon.MemTableManager;
-import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.TestOnly;
@@ -421,7 +420,6 @@ public class MManager {
logger.warn("Current series number {} is too large...", totalSeriesNumber);
allowToCreateNewSeries = false;
}
- updateSchemaDataTypeNumMap(type, 1);
// write log
if (!isRecovering) {
@@ -569,9 +567,6 @@ public class MManager {
removeFromTagInvertedIndex(pair.right);
PartialPath storageGroupPath = pair.left;
- // update statistics in schemaDataTypeNumMap
- updateSchemaDataTypeNumMap(pair.right.getSchema().getType(), -1);
-
// TODO: delete the path node and all its ancestors
mNodeCache.clear();
totalSeriesNumber.addAndGet(-1);
@@ -623,8 +618,6 @@ public class MManager {
List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
for (MeasurementMNode leafMNode : leafMNodes) {
removeFromTagInvertedIndex(leafMNode);
- // update statistics in schemaDataTypeNumMap
- updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
}
if (!config.isEnableMemControl()) {
@@ -642,22 +635,6 @@ public class MManager {
}
/**
- * update statistics in schemaDataTypeNumMap
- *
- * @param type data type
- * @param num 1 for creating timeseries and -1 for deleting timeseries
- */
- private synchronized void updateSchemaDataTypeNumMap(TSDataType type, int num) {
- // add an array of the series type
- schemaDataTypeNumMap.put(type, schemaDataTypeNumMap.getOrDefault(type, 0) + num);
- // add an array of time
- schemaDataTypeNumMap.put(
- TSDataType.INT64, schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
-
- PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get());
- }
-
- /**
* Check if the given path is storage group or not.
*
* @param path Format: root.node.(node)*
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index e1b3fdd..e08773a 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -29,78 +29,156 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
-/** Manage all primitive data list in memory, including get and release operation. */
+/** Manage all primitive data lists in memory, including get and release operations. */
public class PrimitiveArrayManager {
- /** data type -> ArrayDeque<Array> */
- private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
- new EnumMap<>(TSDataType.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveArrayManager.class);
- /** data type -> ratio of data type in schema, which could be seen as recommended ratio */
- private static final Map<TSDataType, Double> bufferedArraysNumRatio =
- new EnumMap<>(TSDataType.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
- private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class);
+ public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize();
- private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ /** threshold total size of arrays for all data types */
+ private static final double POOLED_ARRAYS_MEMORY_THRESHOLD =
+ CONFIG.getAllocateMemoryForWrite() * CONFIG.getBufferedArraysMemoryProportion();
- public static final int ARRAY_SIZE = config.getPrimitiveArraySize();
+ /** TSDataType#serialize() -> ArrayDeque<Array> */
+ private static final ArrayDeque[] POOLED_ARRAYS = new ArrayDeque[TSDataType.values().length];
- /** threshold total size of arrays for all data types */
- private static final double BUFFERED_ARRAY_SIZE_THRESHOLD =
- config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
+ /** TSDataType#serialize() -> max size of ArrayDeque<Array> */
+ private static final int[] LIMITS = new int[TSDataType.values().length];
+
+ /** LIMITS should be updated if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) */
+ private static long limitUpdateThreshold;
- /** total size of buffered arrays */
- private static final AtomicLong bufferedArraysRamSize = new AtomicLong();
+ /** TSDataType#serialize() -> count of allocation requests */
+ private static final AtomicLong[] ALLOCATION_REQUEST_COUNTS =
+ new AtomicLong[] {
+ new AtomicLong(0),
+ new AtomicLong(0),
+ new AtomicLong(0),
+ new AtomicLong(0),
+ new AtomicLong(0),
+ new AtomicLong(0)
+ };
- /** total size of out of buffer arrays */
- private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong();
+ private static final AtomicLong TOTAL_ALLOCATION_REQUEST_COUNT = new AtomicLong(0);
static {
- bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
- bufferedArraysMap.put(TSDataType.INT32, new ArrayDeque<>());
- bufferedArraysMap.put(TSDataType.INT64, new ArrayDeque<>());
- bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
- bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
- bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
+ init();
}
- private PrimitiveArrayManager() {
- logger.info("BufferedArraySizeThreshold is {}", BUFFERED_ARRAY_SIZE_THRESHOLD);
+ private static void init() {
+ LOGGER.info("BufferedArraySizeThreshold is {}", POOLED_ARRAYS_MEMORY_THRESHOLD);
+
+ // POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * ARRAY_SIZE * LIMITS[i])
+ // we init all LIMITS[i] with the same value, so we have
+ // => LIMITS[i] = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / ∑(datatype[i].getDataTypeSize())
+ int totalDataTypeSize = 0;
+ for (TSDataType dataType : TSDataType.values()) {
+ totalDataTypeSize += dataType.getDataTypeSize();
+ }
+ @SuppressWarnings("squid:S3518") // totalDataTypeSize can not be zero
+ double limit = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / totalDataTypeSize;
+ Arrays.fill(LIMITS, (int) limit);
+
+ // limitUpdateThreshold = ∑(LIMITS[i])
+ limitUpdateThreshold = (long) (TSDataType.values().length * limit);
+
+ for (int i = 0; i < POOLED_ARRAYS.length; ++i) {
+ POOLED_ARRAYS[i] = new ArrayDeque<>((int) limit);
+ }
+
+ for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) {
+ allocationRequestCount.set(0);
+ }
+
+ TOTAL_ALLOCATION_REQUEST_COUNT.set(0);
}
+ private PrimitiveArrayManager() {}
+
/**
- * Get primitive data lists according to type
+ * Get or allocate primitive data lists according to type
*
- * @param dataType data type
* @return an array
*/
- public static Object getPrimitiveArraysByType(TSDataType dataType) {
- long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize();
-
- // check memory of buffered array, if already full, generate OOB
- if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) {
- // return an out of buffer array
- outOfBufferArraysRamSize.addAndGet(delta);
- return createPrimitiveArray(dataType);
+ public static Object allocate(TSDataType dataType) {
+ if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) {
+ synchronized (TOTAL_ALLOCATION_REQUEST_COUNT) {
+ if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) {
+ updateLimits();
+ }
+ }
+ }
+
+ int order = dataType.serialize();
+
+ ALLOCATION_REQUEST_COUNTS[order].incrementAndGet();
+ TOTAL_ALLOCATION_REQUEST_COUNT.incrementAndGet();
+
+ Object array;
+ synchronized (POOLED_ARRAYS[order]) {
+ array = POOLED_ARRAYS[order].poll();
+ }
+ if (array == null) {
+ array = createPrimitiveArray(dataType);
+ }
+ return array;
+ }
+
+ private static void updateLimits() {
+ // we want to update LIMITS[i] according to ratios[i]
+ double[] ratios = new double[ALLOCATION_REQUEST_COUNTS.length];
+ for (int i = 0; i < ALLOCATION_REQUEST_COUNTS.length; ++i) {
+ ratios[i] =
+ ALLOCATION_REQUEST_COUNTS[i].get() / (double) TOTAL_ALLOCATION_REQUEST_COUNT.get();
}
- synchronized (bufferedArraysMap.get(dataType)) {
- // try to get a buffered array
- Object dataArray = bufferedArraysMap.get(dataType).poll();
- if (dataArray != null) {
- return dataArray;
+ // initially we have:
+ // POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * LIMITS[i]) * ARRAY_SIZE
+ // we can find a number called limitBase which satisfies:
+ // LIMITS[i] = limitBase * ratios[i]
+
+ // => POOLED_ARRAYS_MEMORY_THRESHOLD =
+ // limitBase * ∑(datatype[i].getDataTypeSize() * ratios[i]) * ARRAY_SIZE
+ // => limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE
+ // / ∑(datatype[i].getDataTypeSize() * ratios[i])
+ long weightedSumOfRatios = 0;
+ for (TSDataType dataType : TSDataType.values()) {
+ weightedSumOfRatios += dataType.getDataTypeSize() * ratios[dataType.serialize()];
+ }
+ @SuppressWarnings("squid:S3518") // weightedSumOfRatios can not be zero
+ double limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / weightedSumOfRatios;
+
+ // LIMITS[i] = limitBase * ratios[i]
+ for (int i = 0; i < LIMITS.length; ++i) {
+ int oldLimit = LIMITS[i];
+ int newLimit = (int) (limitBase * ratios[i]);
+ LIMITS[i] = newLimit;
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "limit of {} array deque size updated: {} -> {}",
+ TSDataType.deserialize((byte) i).name(),
+ oldLimit,
+ newLimit);
}
}
- // no buffered array, create one
- bufferedArraysRamSize.addAndGet(delta);
- return createPrimitiveArray(dataType);
+ // limitUpdateThreshold = ∑(LIMITS[i])
+ limitUpdateThreshold = 0;
+ for (int limit : LIMITS) {
+ limitUpdateThreshold += limit;
+ }
+
+ for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) {
+ allocationRequestCount.set(0);
+ }
+
+ TOTAL_ALLOCATION_REQUEST_COUNT.set(0);
}
private static Object createPrimitiveArray(TSDataType dataType) {
@@ -125,13 +203,49 @@ public class PrimitiveArrayManager {
dataArray = new Binary[ARRAY_SIZE];
break;
default:
- throw new UnSupportedDataTypeException(dataType.toString());
+ throw new UnSupportedDataTypeException(dataType.name());
}
return dataArray;
}
/**
+ * This method is called when bringing back data array
+ *
+ * @param array data array to be released
+ */
+ public static void release(Object array) {
+ int order;
+ if (array instanceof boolean[]) {
+ order = TSDataType.BOOLEAN.serialize();
+ } else if (array instanceof int[]) {
+ order = TSDataType.INT32.serialize();
+ } else if (array instanceof long[]) {
+ order = TSDataType.INT64.serialize();
+ } else if (array instanceof float[]) {
+ order = TSDataType.FLOAT.serialize();
+ } else if (array instanceof double[]) {
+ order = TSDataType.DOUBLE.serialize();
+ } else if (array instanceof Binary[]) {
+ Arrays.fill((Binary[]) array, null);
+ order = TSDataType.TEXT.serialize();
+ } else {
+ throw new UnSupportedDataTypeException(array.getClass().toString());
+ }
+
+ synchronized (POOLED_ARRAYS[order]) {
+ ArrayDeque<Object> arrays = POOLED_ARRAYS[order];
+ if (arrays.size() < LIMITS[order]) {
+ arrays.add(array);
+ }
+ }
+ }
+
+ public static void close() {
+ init();
+ }
+
+ /**
* Get primitive data lists according to data type and size, only for TVList's sorting
*
* @param dataType data type
@@ -178,140 +292,7 @@ public class PrimitiveArrayManager {
}
return binaries;
default:
- return null;
- }
- }
-
- /**
- * This method is called when bringing back data array
- *
- * @param releasingArray data array to be released
- */
- public static void release(Object releasingArray) {
- TSDataType releasingType;
- if (releasingArray instanceof boolean[]) {
- releasingType = TSDataType.BOOLEAN;
- } else if (releasingArray instanceof int[]) {
- releasingType = TSDataType.INT32;
- } else if (releasingArray instanceof long[]) {
- releasingType = TSDataType.INT64;
- } else if (releasingArray instanceof float[]) {
- releasingType = TSDataType.FLOAT;
- } else if (releasingArray instanceof double[]) {
- releasingType = TSDataType.DOUBLE;
- } else if (releasingArray instanceof Binary[]) {
- Arrays.fill((Binary[]) releasingArray, null);
- releasingType = TSDataType.TEXT;
- } else {
- throw new UnSupportedDataTypeException("Unknown data array type");
- }
-
- if (outOfBufferArraysRamSize.get() <= 0) {
- // if there is no out of buffer array, bring back as buffered array directly
- putBackBufferedArray(releasingType, releasingArray);
- } else {
- // if the system has out of buffer array, we need to release some memory
- if (!isCurrentDataTypeExceeded(releasingType)) {
- // if the buffered array of the releasingType is less than expected
- // choose an array of redundantDataType to release and try to buffer the array of
- // releasingType
- for (Entry<TSDataType, ArrayDeque<Object>> entry : bufferedArraysMap.entrySet()) {
- TSDataType dataType = entry.getKey();
- if (isCurrentDataTypeExceeded(dataType)) {
- // if we find a replaced array, bring back the original array as a buffered array
- if (logger.isDebugEnabled()) {
- logger.debug(
- "The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}",
- releasingType,
- dataType);
- }
- // bring back the replaced array as OOB array
- replaceBufferedArray(releasingType, releasingArray, dataType);
- break;
- }
- }
- }
-
- releaseOutOfBuffer(releasingType);
+ throw new UnSupportedDataTypeException(dataType.name());
}
}
-
- /**
- * Bring back a buffered array
- *
- * @param dataType data type
- * @param dataArray data array
- */
- private static void putBackBufferedArray(TSDataType dataType, Object dataArray) {
- synchronized (bufferedArraysMap.get(dataType)) {
- bufferedArraysMap.get(dataType).add(dataArray);
- }
- }
-
- private static void replaceBufferedArray(
- TSDataType releasingType, Object releasingArray, TSDataType redundantType) {
- synchronized (bufferedArraysMap.get(redundantType)) {
- if (bufferedArraysMap.get(redundantType).poll() != null) {
- bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize());
- }
- }
-
- if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize()
- < BUFFERED_ARRAY_SIZE_THRESHOLD) {
- ArrayDeque<Object> releasingArrays = bufferedArraysMap.get(releasingType);
- synchronized (releasingArrays) {
- releasingArrays.add(releasingArray);
- }
- bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize());
- }
- }
-
- private static void releaseOutOfBuffer(TSDataType dataType) {
- outOfBufferArraysRamSize.getAndUpdate(
- l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize()));
- }
-
- /**
- * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
- * specific type)
- * @param totalSeries total time series number
- */
- public static void updateSchemaDataTypeNum(
- Map<TSDataType, Integer> schemaDataTypeNumMap, long totalSeries) {
- for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
- TSDataType dataType = entry.getKey();
- // one time series has 2 columns (time column + value column)
- bufferedArraysNumRatio.put(
- dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2));
- }
- }
-
- /**
- * check whether the ratio of buffered array of specific data type reaches the ratio in schema (as
- * recommended ratio)
- *
- * @param dataType data type
- * @return true if the buffered array ratio exceeds the recommend ratio
- */
- private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
- long total = 0;
- for (ArrayDeque<Object> value : bufferedArraysMap.values()) {
- total += value.size();
- }
- long arrayNumInBuffer =
- bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0;
- return total != 0
- && ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
- }
-
- public static void close() {
- for (ArrayDeque<Object> dataListQueue : bufferedArraysMap.values()) {
- dataListQueue.clear();
- }
-
- bufferedArraysNumRatio.clear();
-
- bufferedArraysRamSize.set(0);
- outOfBufferArraysRamSize.set(0);
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..8ce6295 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -294,7 +294,7 @@ public abstract class TVList {
}
protected Object getPrimitiveArraysByType(TSDataType dataType) {
- return PrimitiveArrayManager.getPrimitiveArraysByType(dataType);
+ return PrimitiveArrayManager.allocate(dataType);
}
protected long[] cloneTime(long[] array) {