You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/02 06:00:29 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-1297] Refactor the memory control when enabling time partitions (#4659)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new d4cab76 [To rel/0.12][IOTDB-1297] Refactor the memory control when enabling time partitions (#4659)
d4cab76 is described below
commit d4cab763e07df3717705e6f22006c93ad8649b2f
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Sun Jan 2 13:59:54 2022 +0800
[To rel/0.12][IOTDB-1297] Refactor the memory control when enabling time partitions (#4659)
---
.../resources/conf/iotdb-engine.properties | 8 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 20 ++++-
.../engine/storagegroup/StorageGroupProcessor.java | 25 ++++--
.../writelog/manager/MultiFileLogNodeManager.java | 38 ++++++++-
.../db/qp/physical/InsertTabletMultiPlanTest.java | 99 ++++++++++++++++++++++
6 files changed, 202 insertions(+), 11 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8e63e07..28a4f65 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -490,6 +490,12 @@ timestamp_precision=ms
# it should be an even number
# max_wal_bytebuffer_num_for_each_partition=6
+# if OOM occurs when registering bytebuffer, system will sleep awhile and then try again.
+# register_buffer_sleep_interval_in_ms=200
+
+# if total sleep time exceeds this, system will reject this write.
+# register_buffer_reject_threshold_in_ms=10000
+
####################
### External sort Configuration
####################
@@ -727,7 +733,7 @@ timestamp_precision=ms
# time range for partitioning data inside each storage group, the unit is second
# partition_interval=604800
-# concurrent_writing_time_partition=500
+# concurrent_writing_time_partition=1
# admin username, default is root
# Datatype: string
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 756b88d..b880ae5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -172,6 +172,11 @@ public class IoTDBConfig {
private long walPoolTrimIntervalInMS = 10_000;
+ /** if OOM occurs when registering bytebuffer, system will sleep awhile and then try again. */
+ private long registerBufferSleepIntervalInMs = 200;
+ /** if total sleep time exceeds this, system will reject this write. */
+ private long registerBufferRejectThresholdInMs = 10_000;
+
private int estimatedSeriesSize = 300;
/**
@@ -589,7 +594,7 @@ public class IoTDBConfig {
private String kerberosPrincipal = "your principal";
/** the num of memtable in each storage group */
- private int concurrentWritingTimePartition = 500;
+ private int concurrentWritingTimePartition = 1;
/** the default fill interval in LinearFill and PreviousFill, -1 means infinite past time */
private int defaultFillInterval = -1;
@@ -1248,6 +1253,22 @@ public class IoTDBConfig {
this.walPoolTrimIntervalInMS = walPoolTrimIntervalInMS;
}
+ public long getRegisterBufferSleepIntervalInMs() {
+ return registerBufferSleepIntervalInMs;
+ }
+
+ public void setRegisterBufferSleepIntervalInMs(long registerBufferSleepIntervalInMs) {
+ this.registerBufferSleepIntervalInMs = registerBufferSleepIntervalInMs;
+ }
+
+ public long getRegisterBufferRejectThresholdInMs() {
+ return registerBufferRejectThresholdInMs;
+ }
+
+ public void setRegisterBufferRejectThresholdInMs(long registerBufferRejectThresholdInMs) {
+ this.registerBufferRejectThresholdInMs = registerBufferRejectThresholdInMs;
+ }
+
public int getEstimatedSeriesSize() {
return estimatedSeriesSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8df773d..a2c6d3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -833,12 +833,30 @@ public class IoTDBDescriptor {
}
long poolTrimIntervalInMS =
- Integer.parseInt(
+ Long.parseLong(
properties.getProperty(
"wal_pool_trim_interval_ms", Long.toString(conf.getWalPoolTrimIntervalInMS())));
if (poolTrimIntervalInMS > 0) {
conf.setWalPoolTrimIntervalInMS(poolTrimIntervalInMS);
}
+
+ long registerBufferSleepIntervalInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "register_buffer_sleep_interval_in_ms",
+ Long.toString(conf.getRegisterBufferSleepIntervalInMs())));
+ if (registerBufferSleepIntervalInMs > 0) {
+ conf.setRegisterBufferSleepIntervalInMs(registerBufferSleepIntervalInMs);
+ }
+
+ long registerBufferRejectThresholdInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "register_buffer_reject_threshold_in_ms",
+ Long.toString(conf.getRegisterBufferRejectThresholdInMs())));
+ if (registerBufferRejectThresholdInMs > 0) {
+ conf.setRegisterBufferRejectThresholdInMs(registerBufferRejectThresholdInMs);
+ }
}
private void loadAutoCreateSchemaProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 42d20ed..e44ce60 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -268,7 +268,10 @@ public class StorageGroupProcessor {
private volatile boolean compacting = false;
- /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
+ /**
+ * get the direct byte buffer from pool, each fetch contains two ByteBuffer, return null if fetch
+ * fails
+ */
public ByteBuffer[] getWalDirectByteBuffer() {
ByteBuffer[] res = new ByteBuffer[2];
synchronized (walByteBufferPool) {
@@ -298,9 +301,21 @@ public class StorageGroupProcessor {
} else {
// if the queue is empty and current size is less than MAX_BYTEBUFFER_NUM
// we can construct another two more new byte buffer
- currentWalPoolSize += 2;
- res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
- res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+ try {
+ currentWalPoolSize += 2;
+ res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+ res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+ } catch (OutOfMemoryError e) {
+ if (res[0] != null) {
+ MmapUtil.clean((MappedByteBuffer) res[0]);
+ currentWalPoolSize -= 1;
+ }
+ if (res[1] != null) {
+ MmapUtil.clean((MappedByteBuffer) res[1]);
+ currentWalPoolSize -= 1;
+ }
+ return null;
+ }
}
// if the pool is empty, set the time back to MAX_VALUE
if (walByteBufferPool.isEmpty()) {
@@ -1306,7 +1321,7 @@ public class StorageGroupProcessor {
synchronized (walByteBufferPool) {
while (!walByteBufferPool.isEmpty()) {
MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeFirst());
- currentWalPoolSize--;
+ currentWalPoolSize -= 1;
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index a9283b1..ddaf132 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -46,10 +46,17 @@ import java.util.function.Supplier;
public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
private static final Logger logger = LoggerFactory.getLogger(MultiFileLogNodeManager.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ // if OOM occurs when registering bytebuffer, getNode method will sleep awhile and then try again
+ private static final long REGISTER_BUFFER_SLEEP_INTERVAL_IN_MS =
+ config.getRegisterBufferSleepIntervalInMs();
+ // if total sleep time exceeds this, getNode method will reject this write
+ private static final long REGISTER_BUFFER_REJECT_THRESHOLD_IN_MS =
+ config.getRegisterBufferRejectThresholdInMs();
+
private final Map<String, WriteLogNode> nodeMap;
private ScheduledExecutorService executorService;
- private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private boolean firstReadOnly = true;
private void forceTask() {
@@ -90,9 +97,34 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
node = new ExclusiveWriteLogNode(identifier);
WriteLogNode oldNode = nodeMap.putIfAbsent(identifier, node);
if (oldNode != null) {
- return oldNode;
+ node = oldNode;
} else {
- node.initBuffer(supplier.get());
+ ByteBuffer[] buffers = supplier.get();
+ int sleepTimeInMs = 0;
+ while (buffers == null) {
+ // log error if this is the first time
+ if (sleepTimeInMs == 0) {
+ logger.error(
+ "Cannot allocate bytebuffer for wal, please reduce wal_buffer_size or storage groups number");
+ }
+ // sleep awhile and then try again
+ try {
+ Thread.sleep(REGISTER_BUFFER_SLEEP_INTERVAL_IN_MS);
+ sleepTimeInMs += REGISTER_BUFFER_SLEEP_INTERVAL_IN_MS;
+ } catch (InterruptedException e) {
+ nodeMap.remove(identifier);
+ }
+ // sleep too long, throw exception
+ if (sleepTimeInMs >= REGISTER_BUFFER_REJECT_THRESHOLD_IN_MS) {
+ nodeMap.remove(identifier);
+ throw new RuntimeException(
+ "Cannot allocate bytebuffer for wal, please reduce wal_buffer_size or storage groups number");
+ }
+ // try to get bytebuffer repeatedly
+ buffers = supplier.get();
+ }
+ // initialize node with bytebuffers
+ node.initBuffer(buffers);
}
}
return node;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index aaa9c0f..d782a77 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -100,4 +100,103 @@ public class InsertTabletMultiPlanTest extends InsertTabletPlanTest {
Assert.assertEquals(60, record.getFields().size());
}
}
+
+ @Test
+ public void testHugeInsertMultiTabletPlan()
+ throws QueryProcessException, MetadataException, StorageEngineException, IOException,
+ InterruptedException, QueryFilterOptimizationException {
+ // run this test case, can throw write_process_npe
+ long[] times = new long[10000];
+ for (int i = 0; i < times.length; i++) {
+ times[i] = i;
+ }
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[16];
+ int size = (times).length;
+ columns[0] = new double[size];
+ columns[1] = new float[size];
+ columns[2] = new long[size];
+ columns[3] = new int[size];
+ columns[4] = new boolean[size];
+ columns[5] = new Binary[size];
+ columns[6] = new Binary[size];
+ columns[7] = new Binary[size];
+ columns[8] = new Binary[size];
+ columns[9] = new Binary[size];
+ columns[10] = new Binary[size];
+ columns[11] = new Binary[size];
+ columns[12] = new Binary[size];
+ columns[13] = new Binary[size];
+ columns[14] = new Binary[size];
+ columns[15] = new Binary[size];
+
+ for (int r = 0; r < size; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[6])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[7])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[8])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[9])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[10])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[11])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[12])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[13])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[14])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[15])[r] = new Binary("hh" + r);
+ }
+
+ List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.multi" + i / 20 + ".d" + i),
+ new String[] {
+ "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10", "s11", "s12", "s13",
+ "s14", "s15", "s16"
+ },
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ insertTabletPlanList.add(tabletPlan);
+ }
+ PlanExecutor executor = new PlanExecutor();
+
+ InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+
+ executor.insertTablet(insertMultiTabletPlan);
+
+ for (int i = 0; i < 1000; i++) {
+ QueryPlan queryPlan =
+ (QueryPlan)
+ processor.parseSQLToPhysicalPlan("select * from root.multi" + i / 20 + ".d" + i);
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(16, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(16, record.getFields().size());
+ }
+ }
+ }
}