You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/03 08:49:13 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] Primitive Array
Manager v2 (#3484)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 0d2872e [To rel/0.12] Primitive Array Manager v2 (#3484)
0d2872e is described below
commit 0d2872ed05ff52e92584feb6e688b1793ab44036
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Jul 3 16:48:49 2021 +0800
[To rel/0.12] Primitive Array Manager v2 (#3484)
---
.../org/apache/iotdb/db/metadata/MManager.java | 17 +--
.../iotdb/db/rescon/PrimitiveArrayManager.java | 161 +++++++++++----------
2 files changed, 84 insertions(+), 94 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 8224995..d34088c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -118,8 +118,6 @@ public class MManager {
private static final String PREVIOUS_CONDITION =
"before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
- private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;
-
private static final Logger logger = LoggerFactory.getLogger(MManager.class);
/** A thread will check whether the MTree is modified lately each such interval. Unit: second */
@@ -142,8 +140,6 @@ public class MManager {
// data type -> number
private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
- // reported total series number
- private long reportedDataTypeTotalNum;
private AtomicLong totalSeriesNumber = new AtomicLong();
private boolean initialized;
protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -239,7 +235,6 @@ public class MManager {
logger.error(
"Cannot recover all MTree from file, we try to recover as possible as we can", e);
}
- reportedDataTypeTotalNum = 0L;
initialized = true;
}
@@ -315,7 +310,6 @@ public class MManager {
tagLogFile = null;
}
this.schemaDataTypeNumMap.clear();
- this.reportedDataTypeTotalNum = 0L;
initialized = false;
if (config.isEnableMTreeSnapshot() && timedCreateMTreeSnapshotThread != null) {
timedCreateMTreeSnapshotThread.shutdownNow();
@@ -660,16 +654,7 @@ public class MManager {
schemaDataTypeNumMap.put(
TSDataType.INT64, schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num);
- // total current DataType Total Num (twice of number of time series)
- // used in primitive array manager
- long currentDataTypeTotalNum = totalSeriesNumber.get() * 2;
-
- if (num > 0
- && currentDataTypeTotalNum - reportedDataTypeTotalNum
- >= UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD) {
- PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, currentDataTypeTotalNum);
- reportedDataTypeTotalNum = currentDataTypeTotalNum;
- }
+ PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get());
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index aa6c264..e1b3fdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -31,6 +31,7 @@ import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
/** Manage all primitive data list in memory, including get and release operation. */
@@ -40,10 +41,6 @@ public class PrimitiveArrayManager {
private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
new EnumMap<>(TSDataType.class);
- /** data type -> current number of buffered arrays */
- private static final Map<TSDataType, Integer> bufferedArraysNumMap =
- new EnumMap<>(TSDataType.class);
-
/** data type -> ratio of data type in schema, which could be seen as recommended ratio */
private static final Map<TSDataType, Double> bufferedArraysNumRatio =
new EnumMap<>(TSDataType.class);
@@ -59,10 +56,10 @@ public class PrimitiveArrayManager {
config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
/** total size of buffered arrays */
- private static AtomicLong bufferedArraysRamSize = new AtomicLong();
+ private static final AtomicLong bufferedArraysRamSize = new AtomicLong();
/** total size of out of buffer arrays */
- private static AtomicLong outOfBufferArraysRamSize = new AtomicLong();
+ private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong();
static {
bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
@@ -84,11 +81,12 @@ public class PrimitiveArrayManager {
* @return an array
*/
public static Object getPrimitiveArraysByType(TSDataType dataType) {
+ long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize();
+
// check memory of buffered array, if already full, generate OOB
- if (bufferedArraysRamSize.get() + ARRAY_SIZE * dataType.getDataTypeSize()
- > BUFFERED_ARRAY_SIZE_THRESHOLD) {
+ if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) {
// return an out of buffer array
- outOfBufferArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
+ outOfBufferArraysRamSize.addAndGet(delta);
return createPrimitiveArray(dataType);
}
@@ -98,11 +96,10 @@ public class PrimitiveArrayManager {
if (dataArray != null) {
return dataArray;
}
- // no buffered array, create one
- bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
- bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
}
+ // no buffered array, create one
+ bufferedArraysRamSize.addAndGet(delta);
return createPrimitiveArray(dataType);
}
@@ -188,59 +185,54 @@ public class PrimitiveArrayManager {
/**
* This method is called when bringing back data array
*
- * @param dataArray data array
+ * @param releasingArray data array to be released
*/
- public static void release(Object dataArray) {
- TSDataType dataType;
- if (dataArray instanceof boolean[]) {
- dataType = TSDataType.BOOLEAN;
- } else if (dataArray instanceof int[]) {
- dataType = TSDataType.INT32;
- } else if (dataArray instanceof long[]) {
- dataType = TSDataType.INT64;
- } else if (dataArray instanceof float[]) {
- dataType = TSDataType.FLOAT;
- } else if (dataArray instanceof double[]) {
- dataType = TSDataType.DOUBLE;
- } else if (dataArray instanceof Binary[]) {
- Arrays.fill((Binary[]) dataArray, null);
- dataType = TSDataType.TEXT;
+ public static void release(Object releasingArray) {
+ TSDataType releasingType;
+ if (releasingArray instanceof boolean[]) {
+ releasingType = TSDataType.BOOLEAN;
+ } else if (releasingArray instanceof int[]) {
+ releasingType = TSDataType.INT32;
+ } else if (releasingArray instanceof long[]) {
+ releasingType = TSDataType.INT64;
+ } else if (releasingArray instanceof float[]) {
+ releasingType = TSDataType.FLOAT;
+ } else if (releasingArray instanceof double[]) {
+ releasingType = TSDataType.DOUBLE;
+ } else if (releasingArray instanceof Binary[]) {
+ Arrays.fill((Binary[]) releasingArray, null);
+ releasingType = TSDataType.TEXT;
} else {
throw new UnSupportedDataTypeException("Unknown data array type");
}
- // Check out of buffer array num
- if (outOfBufferArraysRamSize.get() > 0 && isCurrentDataTypeExceeded(dataType)) {
- // release an out of buffer array
- bringBackOOBArray(dataType, ARRAY_SIZE);
- } else if (outOfBufferArraysRamSize.get() > 0 && !isCurrentDataTypeExceeded(dataType)) {
- // if the ratio of buffered arrays of this data type does not exceed the schema ratio,
- // choose one replaced array who has larger ratio than schema recommended ratio
- TSDataType replacedDataType = null;
- for (Map.Entry<TSDataType, Integer> entry : bufferedArraysNumMap.entrySet()) {
- if (isCurrentDataTypeExceeded(entry.getKey())) {
- replacedDataType = entry.getKey();
- // bring back the replaced array as OOB array
- bringBackOOBArray(replacedDataType, ARRAY_SIZE);
- break;
- }
- }
- if (replacedDataType != null) {
- // if we find a replaced array, bring back the original array as a buffered array
- if (logger.isDebugEnabled()) {
- logger.debug(
- "The ratio of {} in buffered array has not reached the schema ratio. Replaced by {}",
- dataType,
- replacedDataType);
+ if (outOfBufferArraysRamSize.get() <= 0) {
+ // if there is no out of buffer array, bring back as buffered array directly
+ putBackBufferedArray(releasingType, releasingArray);
+ } else {
+ // if the system has out of buffer array, we need to release some memory
+ if (!isCurrentDataTypeExceeded(releasingType)) {
+ // if the buffered array of the releasingType is less than expected
+ // choose an array of redundantDataType to release and try to buffer the array of
+ // releasingType
+ for (Entry<TSDataType, ArrayDeque<Object>> entry : bufferedArraysMap.entrySet()) {
+ TSDataType dataType = entry.getKey();
+ if (isCurrentDataTypeExceeded(dataType)) {
+ // if we find a replaced array, bring back the original array as a buffered array
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}",
+ releasingType,
+ dataType);
+ }
+ // bring back the replaced array as OOB array
+ replaceBufferedArray(releasingType, releasingArray, dataType);
+ break;
+ }
}
- bringBackBufferedArray(dataType, dataArray);
- } else {
- // or else bring back the original array as OOB array
- bringBackOOBArray(dataType, ARRAY_SIZE);
}
- } else {
- // if there is no out of buffer array, bring back as buffered array directly
- bringBackBufferedArray(dataType, dataArray);
+
+ releaseOutOfBuffer(releasingType);
}
}
@@ -250,34 +242,47 @@ public class PrimitiveArrayManager {
* @param dataType data type
* @param dataArray data array
*/
- private static void bringBackBufferedArray(TSDataType dataType, Object dataArray) {
+ private static void putBackBufferedArray(TSDataType dataType, Object dataArray) {
synchronized (bufferedArraysMap.get(dataType)) {
bufferedArraysMap.get(dataType).add(dataArray);
- bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
}
- bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * dataType.getDataTypeSize());
}
- /**
- * Bring back out of buffered array
- *
- * @param dataType data type
- * @param size capacity
- */
- private static void bringBackOOBArray(TSDataType dataType, int size) {
- outOfBufferArraysRamSize.addAndGet((long) -size * dataType.getDataTypeSize());
+ private static void replaceBufferedArray(
+ TSDataType releasingType, Object releasingArray, TSDataType redundantType) {
+ synchronized (bufferedArraysMap.get(redundantType)) {
+ if (bufferedArraysMap.get(redundantType).poll() != null) {
+ bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize());
+ }
+ }
+
+ if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize()
+ < BUFFERED_ARRAY_SIZE_THRESHOLD) {
+ ArrayDeque<Object> releasingArrays = bufferedArraysMap.get(releasingType);
+ synchronized (releasingArrays) {
+ releasingArrays.add(releasingArray);
+ }
+ bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize());
+ }
+ }
+
+ private static void releaseOutOfBuffer(TSDataType dataType) {
+ outOfBufferArraysRamSize.getAndUpdate(
+ l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize()));
}
/**
* @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
* specific type)
- * @param total current DataType Total Num (twice of number of time series)
+ * @param totalSeries total time series number
*/
public static void updateSchemaDataTypeNum(
- Map<TSDataType, Integer> schemaDataTypeNumMap, long total) {
+ Map<TSDataType, Integer> schemaDataTypeNumMap, long totalSeries) {
for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
TSDataType dataType = entry.getKey();
- bufferedArraysNumRatio.put(dataType, (double) schemaDataTypeNumMap.get(dataType) / total);
+ // one time series has 2 columns (time column + value column)
+ bufferedArraysNumRatio.put(
+ dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2));
}
}
@@ -289,13 +294,14 @@ public class PrimitiveArrayManager {
* @return true if the buffered array ratio exceeds the recommend ratio
*/
private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
- int total = 0;
- for (int num : bufferedArraysNumMap.values()) {
- total += num;
+ long total = 0;
+ for (ArrayDeque<Object> value : bufferedArraysMap.values()) {
+ total += value.size();
}
+ long arrayNumInBuffer =
+ bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0;
return total != 0
- && ((double) bufferedArraysNumMap.getOrDefault(dataType, 0) / total
- > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
+ && ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
}
public static void close() {
@@ -303,7 +309,6 @@ public class PrimitiveArrayManager {
dataListQueue.clear();
}
- bufferedArraysNumMap.clear();
bufferedArraysNumRatio.clear();
bufferedArraysRamSize.set(0);