You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/17 15:34:37 UTC
[iotdb] branch encoding_parallel updated: fix bugs and open setting
for concurrent encoding thread number
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch encoding_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/encoding_parallel by this push:
new c944b4b fix bugs and open setting for concurrent encoding thread number
c944b4b is described below
commit c944b4b8ee450e501ed210b75ebd4447d8d8caa4
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Mar 17 23:34:06 2021 +0800
fix bugs and open setting for concurrent encoding thread number
---
server/src/assembly/resources/conf/iotdb-engine.properties | 2 ++
.../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++++++++
.../main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++++++
.../iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java | 8 ++++++--
.../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java | 10 +++++++---
.../iotdb/db/writelog/recover/TsFileRecoverPerformer.java | 7 ++++---
.../iotdb/db/engine/memtable/MemTableFlushTaskTest.java | 3 ++-
server/src/test/resources/logback.xml | 3 ---
8 files changed, 38 insertions(+), 12 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8c4c6fb..e833bcf 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -288,6 +288,8 @@ estimated_series_size=300
# size of ioTaskQueue. The default value is 10
io_task_queue_size_for_flushing=10
+# maximal number of concurrent threads for encoding in one memtable flushing task. default is 2.
+concurrent_encoding_task_size_in_one_memtable_flushing=2
####################
### Upgrade Configurations
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0a6ddbc..863009b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -640,6 +640,9 @@ public class IoTDBConfig {
/** the size of ioTaskQueue */
private int ioTaskQueueSizeForFlushing = 10;
+ /** the maximal concurrent encoding tasks in a memtable flushing task */
+ private int concurrentEncodingTasksInOneMemtable = 2;
+
/** the number of virtual storage groups per user-defined storage group */
private int virtualStorageGroupNum = 1;
@@ -2081,4 +2084,12 @@ public class IoTDBConfig {
public void setIoTaskQueueSizeForFlushing(int ioTaskQueueSizeForFlushing) {
this.ioTaskQueueSizeForFlushing = ioTaskQueueSizeForFlushing;
}
+
+ public int getConcurrentEncodingTasksInOneMemtable() {
+ return concurrentEncodingTasksInOneMemtable;
+ }
+
+ public void setConcurrentEncodingTasksInOneMemtable(int concurrentEncodingTasksInOneMemtable) {
+ this.concurrentEncodingTasksInOneMemtable = concurrentEncodingTasksInOneMemtable;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2ef9347..b955f44 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -299,6 +299,12 @@ public class IoTDBDescriptor {
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
+ conf.setConcurrentEncodingTasksInOneMemtable(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_encoding_task_size_in_one_memtable_flushing",
+ Integer.toString(conf.getConcurrentEncodingTasksInOneMemtable()))));
+
conf.setMergeChunkPointNumberThreshold(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
index 46a04df..526c4f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
@@ -54,7 +54,8 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
- int threadSize = 10;
+ int threadSize =
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentEncodingTasksInOneMemtable();
private final LinkedBlockingQueue<Object>[] encodingTaskQueues;
private LinkedBlockingQueue<Object>[] ioTaskQueues;
@@ -153,6 +154,9 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
for (LinkedBlockingQueue encodingTaskQueue : encodingTaskQueues) {
encodingTaskQueue.clear();
}
+ for (Future future : encodingTaskFutures) {
+ future.cancel(true);
+ }
ioTaskFuture.cancel(true);
throw e;
}
@@ -343,7 +347,7 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
// means the queue is done
continue;
}
- ioMessage = ioTaskQueues[i].poll(10, TimeUnit.MILLISECONDS);
+ ioMessage = ioTaskQueues[i].poll(5, TimeUnit.MILLISECONDS);
}
} else {
ioMessage = ioTaskQueues[i].take();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..345e367 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -26,7 +26,8 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.IMemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask;
import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -774,8 +775,11 @@ public class TsFileProcessor {
if (!memTableToFlush.isSignalMemTable()) {
try {
writer.mark();
- MemTableFlushTask flushTask =
- new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
+ IMemTableFlushTask flushTask =
+ // config.getConcurrentEncodingTasksInOneMemtable() == 1
+ // ? new MemTableFlushTask(memTableToFlush, writer, storageGroupName)
+ // :
+ new MultiThreadMemTableFlushTask(memTableToFlush, writer, storageGroupName);
flushTask.syncFlushMemTable();
} catch (Exception e) {
if (writer == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index cea833d..d45add5 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.db.writelog.recover;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.IMemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -217,8 +218,8 @@ public class TsFileRecoverPerformer {
try {
if (!recoverMemTable.isEmpty()) {
// flush logs
- MemTableFlushTask tableFlushTask =
- new MemTableFlushTask(
+ IMemTableFlushTask tableFlushTask =
+ new MultiThreadMemTableFlushTask(
recoverMemTable,
restorableTsFileIOWriter,
tsFileResource.getTsFile().getParentFile().getParentFile().getName());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index a1b5d55..bdb2609 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.memtable;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.IMemTableFlushTask;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -69,7 +70,7 @@ public class MemTableFlushTaskTest {
MemTableTestUtils.deviceId0,
MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0);
- MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
+ IMemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
assertTrue(
writer
.getVisibleMetadataList(
diff --git a/server/src/test/resources/logback.xml b/server/src/test/resources/logback.xml
index 636914e..fe81269 100644
--- a/server/src/test/resources/logback.xml
+++ b/server/src/test/resources/logback.xml
@@ -40,9 +40,6 @@
<!-- enable me if you want to monitor when files are opened and closed.
<logger name="FileMonitor" level="info"/>
-->
- <logger name="org.apache.iotdb.db.engine.merge" level="DEBUG"/>
- <logger name="org.apache.iotdb.db.service.thrift.ThriftServiceThread" level="DEBUG"/>
- <logger name="org.eclipse.jetty.util.thread.QueuedThreadPool" level="DEBUG"/>
<logger name="org.apache.iotdb.db.service.MetricsService" level="INFO"/>
<logger name="org.apache.iotdb.db.engine.flush.FlushManager" level="INFO"/>
<logger name="org.apache.iotdb.db.integration.IoTDBMergeIT" level="INFO"/>