You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/17 15:34:37 UTC

[iotdb] branch encoding_parallel updated: fix bugs and open setting for concurrent encoding thread number

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch encoding_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/encoding_parallel by this push:
     new c944b4b  fix bugs and open setting for concurrent encoding thread number
c944b4b is described below

commit c944b4b8ee450e501ed210b75ebd4447d8d8caa4
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Mar 17 23:34:06 2021 +0800

    fix bugs and open setting for concurrent encoding thread number
---
 server/src/assembly/resources/conf/iotdb-engine.properties    |  2 ++
 .../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java   | 11 +++++++++++
 .../main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   |  6 ++++++
 .../iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java   |  8 ++++++--
 .../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java  | 10 +++++++---
 .../iotdb/db/writelog/recover/TsFileRecoverPerformer.java     |  7 ++++---
 .../iotdb/db/engine/memtable/MemTableFlushTaskTest.java       |  3 ++-
 server/src/test/resources/logback.xml                         |  3 ---
 8 files changed, 38 insertions(+), 12 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8c4c6fb..e833bcf 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -288,6 +288,8 @@ estimated_series_size=300
 # size of ioTaskQueue. The default value is 10
 io_task_queue_size_for_flushing=10
 
+# maximal number of concurrent threads for encoding in one memtable flushing task. default is 2.
+concurrent_encoding_task_size_in_one_memtable_flushing=2
 ####################
 ### Upgrade Configurations
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0a6ddbc..863009b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -640,6 +640,9 @@ public class IoTDBConfig {
   /** the size of ioTaskQueue */
   private int ioTaskQueueSizeForFlushing = 10;
 
+  /** the maximal concurrent encoding tasks in a memtable flushing task */
+  private int concurrentEncodingTasksInOneMemtable = 2;
+
   /** the number of virtual storage groups per user-defined storage group */
   private int virtualStorageGroupNum = 1;
 
@@ -2081,4 +2084,12 @@ public class IoTDBConfig {
   public void setIoTaskQueueSizeForFlushing(int ioTaskQueueSizeForFlushing) {
     this.ioTaskQueueSizeForFlushing = ioTaskQueueSizeForFlushing;
   }
+
+  public int getConcurrentEncodingTasksInOneMemtable() {
+    return concurrentEncodingTasksInOneMemtable;
+  }
+
+  public void setConcurrentEncodingTasksInOneMemtable(int concurrentEncodingTasksInOneMemtable) {
+    this.concurrentEncodingTasksInOneMemtable = concurrentEncodingTasksInOneMemtable;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2ef9347..b955f44 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -299,6 +299,12 @@ public class IoTDBDescriptor {
                   "io_task_queue_size_for_flushing",
                   Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
 
+      conf.setConcurrentEncodingTasksInOneMemtable(
+          Integer.parseInt(
+              properties.getProperty(
+                  "concurrent_encoding_task_size_in_one_memtable_flushing",
+                  Integer.toString(conf.getConcurrentEncodingTasksInOneMemtable()))));
+
       conf.setMergeChunkPointNumberThreshold(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
index 46a04df..526c4f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
@@ -54,7 +54,8 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
-  int threadSize = 10;
+  int threadSize =
+      IoTDBDescriptor.getInstance().getConfig().getConcurrentEncodingTasksInOneMemtable();
 
   private final LinkedBlockingQueue<Object>[] encodingTaskQueues;
   private LinkedBlockingQueue<Object>[] ioTaskQueues;
@@ -153,6 +154,9 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
         for (LinkedBlockingQueue encodingTaskQueue : encodingTaskQueues) {
           encodingTaskQueue.clear();
         }
+        for (Future future : encodingTaskFutures) {
+          future.cancel(true);
+        }
         ioTaskFuture.cancel(true);
         throw e;
       }
@@ -343,7 +347,7 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
                   // means the queue is done
                   continue;
                 }
-                ioMessage = ioTaskQueues[i].poll(10, TimeUnit.MILLISECONDS);
+                ioMessage = ioTaskQueues[i].poll(5, TimeUnit.MILLISECONDS);
               }
             } else {
               ioMessage = ioTaskQueues[i].take();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..345e367 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -26,7 +26,8 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.IMemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask;
 import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -774,8 +775,11 @@ public class TsFileProcessor {
     if (!memTableToFlush.isSignalMemTable()) {
       try {
         writer.mark();
-        MemTableFlushTask flushTask =
-            new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
+        IMemTableFlushTask flushTask =
+            // config.getConcurrentEncodingTasksInOneMemtable() == 1
+            //  ? new MemTableFlushTask(memTableToFlush, writer, storageGroupName)
+            //        :
+            new MultiThreadMemTableFlushTask(memTableToFlush, writer, storageGroupName);
         flushTask.syncFlushMemTable();
       } catch (Exception e) {
         if (writer == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index cea833d..d45add5 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -20,7 +20,8 @@
 package org.apache.iotdb.db.writelog.recover;
 
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.IMemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MultiThreadMemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -217,8 +218,8 @@ public class TsFileRecoverPerformer {
     try {
       if (!recoverMemTable.isEmpty()) {
         // flush logs
-        MemTableFlushTask tableFlushTask =
-            new MemTableFlushTask(
+        IMemTableFlushTask tableFlushTask =
+            new MultiThreadMemTableFlushTask(
                 recoverMemTable,
                 restorableTsFileIOWriter,
                 tsFileResource.getTsFile().getParentFile().getParentFile().getName());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
index a1b5d55..bdb2609 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.memtable;
 
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.IMemTableFlushTask;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -69,7 +70,7 @@ public class MemTableFlushTaskTest {
         MemTableTestUtils.deviceId0,
         MemTableTestUtils.measurementId0,
         MemTableTestUtils.dataType0);
-    MemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
+    IMemTableFlushTask memTableFlushTask = new MemTableFlushTask(memTable, writer, storageGroup);
     assertTrue(
         writer
             .getVisibleMetadataList(
diff --git a/server/src/test/resources/logback.xml b/server/src/test/resources/logback.xml
index 636914e..fe81269 100644
--- a/server/src/test/resources/logback.xml
+++ b/server/src/test/resources/logback.xml
@@ -40,9 +40,6 @@
     <!-- enable me if you want to monitor when files are opened and closed.
     <logger name="FileMonitor" level="info"/>
     -->
-    <logger name="org.apache.iotdb.db.engine.merge" level="DEBUG"/>
-    <logger name="org.apache.iotdb.db.service.thrift.ThriftServiceThread" level="DEBUG"/>
-    <logger name="org.eclipse.jetty.util.thread.QueuedThreadPool" level="DEBUG"/>
     <logger name="org.apache.iotdb.db.service.MetricsService" level="INFO"/>
     <logger name="org.apache.iotdb.db.engine.flush.FlushManager" level="INFO"/>
     <logger name="org.apache.iotdb.db.integration.IoTDBMergeIT" level="INFO"/>