You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/28 08:12:59 UTC
[iotdb] 01/01: fix flush OOM
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch flush_OOM_11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c38974b1ba877c9cfc742ff17c3eae949cfecf10
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Dec 28 16:12:23 2020 +0800
fix flush OOM
---
.../resources/conf/iotdb-engine.properties | 3 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 ++
.../iotdb/db/engine/flush/MemTableFlushTask.java | 49 +++++++++++-----------
4 files changed, 44 insertions(+), 25 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3cf1302..7c08de1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -267,6 +267,9 @@ max_waiting_time_when_insert_blocked=10000
# estimated metadata size (in byte) of one timeseries in Mtree
estimated_series_size=300
+# size of ioTaskQueue and encodingTaskQueue. The default value is 2147483647
+queue_size_for_flushing=2147483647
+
####################
### Upgrade Configurations
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5582a96..ac90660 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -759,6 +759,11 @@ public class IoTDBConfig {
*/
private boolean debugState = false;
+ /**
+ * the size of ioTaskQueue and encodingTaskQueue
+ */
+ private int queueSizeForFlushing = Integer.MAX_VALUE;
+
public IoTDBConfig() {
// empty constructor
}
@@ -2045,4 +2050,12 @@ public class IoTDBConfig {
public void setDebugState(boolean debugState) {
this.debugState = debugState;
}
+
+ public int getQueueSizeForFlushing() {
+ return queueSizeForFlushing;
+ }
+
+ public void setQueueSizeForFlushing(int queueSizeForFlushing) {
+ this.queueSizeForFlushing = queueSizeForFlushing;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dc95ef1..15520ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -295,6 +295,10 @@ public class IoTDBDescriptor {
.getProperty("estimated_series_size",
Integer.toString(conf.getEstimatedSeriesSize()))));
+ conf.setQueueSizeForFlushing(Integer.parseInt(properties
+ .getProperty("queue_size_for_flushing",
+ Integer.toString(conf.getQueueSizeForFlushing()))));
+
conf.setMergeChunkPointNumberThreshold(Integer.parseInt(properties
.getProperty("merge_chunk_point_number",
Integer.toString(conf.getMergeChunkPointNumberThreshold()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..550c35f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.engine.flush;
import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
@@ -42,12 +44,15 @@ public class MemTableFlushTask {
private static final Logger logger = LoggerFactory.getLogger(MemTableFlushTask.class);
private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
.getInstance();
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
- private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
- private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
+ private final LinkedBlockingQueue<Object> ioTaskQueue =
+ new LinkedBlockingQueue<>(config.getQueueSizeForFlushing());
+ private final LinkedBlockingQueue<Object> encodingTaskQueue =
+ new LinkedBlockingQueue<>(config.getQueueSizeForFlushing());
private String storageGroup;
private IMemTable memTable;
@@ -84,16 +89,16 @@ public class MemTableFlushTask {
long sortTime = 0;
for (String deviceId : memTable.getMemTableMap().keySet()) {
- encodingTaskQueue.add(new StartFlushGroupIOTask(deviceId));
+ encodingTaskQueue.put(new StartFlushGroupIOTask(deviceId));
for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
MeasurementSchema desc = series.getSchema();
TVList tvList = series.getSortedTVList();
sortTime += System.currentTimeMillis() - startTime;
- encodingTaskQueue.add(new Pair<>(tvList, desc));
+ encodingTaskQueue.put(new Pair<>(tvList, desc));
}
- encodingTaskQueue.add(new EndChunkGroupIoTask());
+ encodingTaskQueue.put(new EndChunkGroupIoTask());
}
noMoreEncodingTask = true;
logger.debug(
@@ -176,23 +181,25 @@ public class MemTableFlushTask {
if (noMoreMessages) {
break;
}
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- logger.error("Storage group {} memtable {}, encoding task is interrupted.",
- storageGroup, memTable.getVersion(), e);
- // generally it is because the thread pool is shutdown so the task should be aborted
- break;
- }
} else {
if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
- ioTaskQueue.add(task);
+ try {
+ ioTaskQueue.put(task);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
} else {
long starTime = System.currentTimeMillis();
Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
- ioTaskQueue.add(seriesWriter);
+ try {
+ ioTaskQueue.put(seriesWriter);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
memSerializeTime += System.currentTimeMillis() - starTime;
}
}
@@ -218,14 +225,6 @@ public class MemTableFlushTask {
if (returnWhenNoTask) {
break;
}
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- logger.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
- , memTable.getVersion(), e);
- // generally it is because the thread pool is shutdown so the task should be aborted
- break;
- }
} else {
long starTime = System.currentTimeMillis();
try {