You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/21 10:10:30 UTC

[iotdb] 01/03: change flush thread pool from cached to single

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch clear_wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0dc433a9660777fa84c0497bac938f3fe8ffb669
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jul 21 16:13:43 2021 +0800

    change flush thread pool from cached to single
---
 .../db/writelog/node/ExclusiveWriteLogNode.java     | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index aa793ea..152d7ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.writelog.io.ILogWriter;
 import org.apache.iotdb.db.writelog.io.LogWriter;
 import org.apache.iotdb.db.writelog.io.MultiFileLogReader;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,9 +68,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private final Object switchBufferCondition = new Object();
   private final ReentrantLock lock = new ReentrantLock();
-  private static final ExecutorService FLUSH_BUFFER_THREAD_POOL =
-      Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setNameFormat("Flush-WAL-Thread-%d").setDaemon(true).build());
+  private final ExecutorService FLUSH_BUFFER_THREAD_POOL =
+      Executors.newSingleThreadExecutor(r -> new Thread(r, "Flush-WAL-Thread"));
 
   private long fileId = 0;
   private long lastFlushedId = 0;
@@ -261,6 +259,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       }
       switchBufferWorkingToFlushing();
       ILogWriter currWriter = getCurrentFileWriter();
+      logger.warn("[wal] {} sync submit a flush thread", this.hashCode());
       FLUSH_BUFFER_THREAD_POOL.submit(() -> flushBuffer(currWriter));
       bufferedLogNum = 0;
       logger.debug("Log node {} ends sync.", identifier);
@@ -275,6 +274,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   }
 
   private void flushBuffer(ILogWriter writer) {
+    logger.warn("[wal] {} flushBuffer start", this.hashCode());
+    long start1 = System.currentTimeMillis();
     try {
       writer.write(logBufferFlushing);
     } catch (ClosedChannelException e) {
@@ -284,13 +285,23 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
       return;
     }
+    long elapse1 = System.currentTimeMillis() - start1;
+    if (elapse1 > 3000) {
+      logger.error("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse1);
+    }
 
     // switch buffer flushing to idle and notify the sync thread
+    long start2 = System.currentTimeMillis();
     synchronized (switchBufferCondition) {
       logBufferIdle = logBufferFlushing;
       logBufferFlushing = null;
       switchBufferCondition.notifyAll();
     }
+    long elapse2 = System.currentTimeMillis() - start2;
+    if (elapse2 > 3000) {
+      logger.error("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse2);
+    }
+    logger.warn("[wal] {} flushBuffer end, notify all", this.hashCode());
   }
 
   private void switchBufferWorkingToFlushing() throws InterruptedException {
@@ -306,7 +317,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     }
     long elapse = System.currentTimeMillis() - start;
     if (elapse > 3000) {
-      logger.warn("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse);
+      logger.error("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse);
     }
   }