You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/21 10:10:30 UTC
[iotdb] 01/03: change flush thread pool from cached to single
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch clear_wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0dc433a9660777fa84c0497bac938f3fe8ffb669
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jul 21 16:13:43 2021 +0800
change flush thread pool from cached to single
---
.../db/writelog/node/ExclusiveWriteLogNode.java | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index aa793ea..152d7ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.writelog.io.ILogWriter;
import org.apache.iotdb.db.writelog.io.LogWriter;
import org.apache.iotdb.db.writelog.io.MultiFileLogReader;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,9 +68,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
private final Object switchBufferCondition = new Object();
private final ReentrantLock lock = new ReentrantLock();
- private static final ExecutorService FLUSH_BUFFER_THREAD_POOL =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("Flush-WAL-Thread-%d").setDaemon(true).build());
+ private final ExecutorService FLUSH_BUFFER_THREAD_POOL =
+ Executors.newSingleThreadExecutor(r -> new Thread(r, "Flush-WAL-Thread"));
private long fileId = 0;
private long lastFlushedId = 0;
@@ -261,6 +259,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
switchBufferWorkingToFlushing();
ILogWriter currWriter = getCurrentFileWriter();
+ logger.warn("[wal] {} sync submit a flush thread", this.hashCode());
FLUSH_BUFFER_THREAD_POOL.submit(() -> flushBuffer(currWriter));
bufferedLogNum = 0;
logger.debug("Log node {} ends sync.", identifier);
@@ -275,6 +274,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
private void flushBuffer(ILogWriter writer) {
+ logger.warn("[wal] {} flushBuffer start", this.hashCode());
+ long start1 = System.currentTimeMillis();
try {
writer.write(logBufferFlushing);
} catch (ClosedChannelException e) {
@@ -284,13 +285,23 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
return;
}
+ long elapse1 = System.currentTimeMillis() - start1;
+ if (elapse1 > 3000) {
+ logger.error("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse1);
+ }
// switch buffer flushing to idle and notify the sync thread
+ long start2 = System.currentTimeMillis();
synchronized (switchBufferCondition) {
logBufferIdle = logBufferFlushing;
logBufferFlushing = null;
switchBufferCondition.notifyAll();
}
+ long elapse2 = System.currentTimeMillis() - start2;
+ if (elapse2 > 3000) {
+ logger.error("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse2);
+ }
+ logger.warn("[wal] {} flushBuffer end, notify all", this.hashCode());
}
private void switchBufferWorkingToFlushing() throws InterruptedException {
@@ -306,7 +317,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
long elapse = System.currentTimeMillis() - start;
if (elapse > 3000) {
- logger.warn("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse);
+ logger.error("[wal] {} switch Working -> Flushing cost: {}ms", this.hashCode(), elapse);
}
}