You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/24 04:46:50 UTC
[iotdb] branch native_raft updated: use DynamicThread
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new e348f39cda use DynamicThread
e348f39cda is described below
commit e348f39cdac7b20771dd587e826f2b0def0e34ce
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Apr 24 12:49:16 2023 +0800
use DynamicThread
---
.../protocol/log/dispatch/DispatcherGroup.java | 44 ++++-------
.../protocol/log/dispatch/DispatcherThread.java | 52 +++----------
.../manager/serialization/StableEntryManager.java | 1 +
.../serialization/SyncLogDequeSerializer.java | 90 ++++++++--------------
4 files changed, 54 insertions(+), 133 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index e953d1bc0a..1fef54f95d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -19,21 +19,20 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
-
import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class DispatcherGroup {
+
private static final Logger logger = LoggerFactory.getLogger(DispatcherGroup.class);
private final Peer peer;
private final BlockingQueue<VotingEntry> entryQueue;
@@ -41,9 +40,8 @@ public class DispatcherGroup {
private final RateLimiter rateLimiter;
private final ExecutorService dispatcherThreadPool;
private final LogDispatcher logDispatcher;
- private final AtomicInteger groupThreadNum = new AtomicInteger();
- private int maxBindingThreadNum;
private boolean delayed;
+ private DynamicThreadGroup dynamicThreadGroup;
public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int maxBindingThreadNum) {
this.logDispatcher = logDispatcher;
@@ -51,11 +49,10 @@ public class DispatcherGroup {
this.entryQueue = new ArrayBlockingQueue<>(logDispatcher.getConfig().getMaxNumOfLogsInMem());
this.nodeEnabled = true;
this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
- this.maxBindingThreadNum = maxBindingThreadNum;
this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
- for (int i = 0; i < maxBindingThreadNum; i++) {
- addThread();
- }
+ this.dynamicThreadGroup = new DynamicThreadGroup(logDispatcher.member.getName() + "-" + peer,
+ dispatcherThreadPool::submit, () -> newDispatcherThread(peer, entryQueue, rateLimiter), 1,
+ maxBindingThreadNum);
}
public void close() {
@@ -72,15 +69,6 @@ public class DispatcherGroup {
}
}
- public void addThread() {
- int threadNum = groupThreadNum.incrementAndGet();
- if (threadNum <= maxBindingThreadNum) {
- dispatcherThreadPool.submit(newDispatcherThread(peer, entryQueue, rateLimiter));
- } else {
- groupThreadNum.decrementAndGet();
- }
- }
-
DispatcherThread newDispatcherThread(
Peer node, BlockingQueue<VotingEntry> logBlockingQueue, RateLimiter rateLimiter) {
return new DispatcherThread(logDispatcher, node, logBlockingQueue, rateLimiter, this);
@@ -115,14 +103,6 @@ public class DispatcherGroup {
return entryQueue;
}
- public AtomicInteger getGroupThreadNum() {
- return groupThreadNum;
- }
-
- public int getMaxBindingThreadNum() {
- return maxBindingThreadNum;
- }
-
public boolean isDelayed() {
return delayed;
}
@@ -130,4 +110,8 @@ public class DispatcherGroup {
public void setDelayed(boolean delayed) {
this.delayed = delayed;
}
+
+ public DynamicThreadGroup getDynamicThreadGroup() {
+ return dynamicThreadGroup;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index e571d57cb4..31f71707cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -42,7 +43,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-class DispatcherThread implements Runnable {
+class DispatcherThread extends DynamicThread {
private static final Logger logger = LoggerFactory.getLogger(DispatcherThread.class);
@@ -53,8 +54,6 @@ class DispatcherThread implements Runnable {
private final String baseName;
private final RateLimiter rateLimiter;
private final DispatcherGroup group;
- private long idleTimeSum;
- private long runningTimeSum;
private long lastDispatchTime;
protected DispatcherThread(
@@ -63,6 +62,7 @@ class DispatcherThread implements Runnable {
BlockingQueue<VotingEntry> logBlockingDeque,
RateLimiter rateLimiter,
DispatcherGroup group) {
+ super(group.getDynamicThreadGroup());
this.logDispatcher = logDispatcher;
this.receiver = receiver;
this.logBlockingDeque = logBlockingDeque;
@@ -72,13 +72,11 @@ class DispatcherThread implements Runnable {
}
@Override
- public void run() {
+ public void runInternal() {
if (logger.isDebugEnabled()) {
Thread.currentThread().setName(baseName);
}
try {
- long idleStart = System.nanoTime();
- long runningStart = 0;
while (!Thread.interrupted()) {
if (group.isDelayed()) {
if (logBlockingDeque.size() < logDispatcher.maxBatchSize
@@ -100,9 +98,7 @@ class DispatcherThread implements Runnable {
continue;
}
}
- long currTime = System.nanoTime();
- idleTimeSum += currTime - idleStart;
- runningStart = currTime;
+ idleToRunning();
if (logger.isDebugEnabled()) {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
}
@@ -113,33 +109,11 @@ class DispatcherThread implements Runnable {
}
sendLogs(currBatch);
currBatch.clear();
+ lastDispatchTime = System.nanoTime();
+ runningToIdle();
- currTime = System.nanoTime();
- lastDispatchTime = currTime;
- runningTimeSum += currTime - runningStart;
- idleStart = currTime;
-
- // thread too idle
- if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5
- && runningTimeSum > 10_000_000_000L) {
- int remaining = group.getGroupThreadNum().decrementAndGet();
- if (remaining > 1) {
- logger.info("Dispatcher thread too idle");
- group.getGroupThreadNum().incrementAndGet();
- break;
- } else {
- group.getGroupThreadNum().incrementAndGet();
- }
- // thread too busy
- } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.1
- && runningTimeSum > 10_000_000_000L) {
- int groupThreadNum = group.getGroupThreadNum().get();
- if (groupThreadNum < group.getMaxBindingThreadNum()) {
- group.addThread();
- }
- // avoid frequent change
- runningTimeSum = 0;
- idleTimeSum = 0;
+ if (shouldExit()) {
+ break;
}
}
} catch (InterruptedException e) {
@@ -147,14 +121,6 @@ class DispatcherThread implements Runnable {
} catch (Exception e) {
logger.error("Unexpected error in log dispatcher", e);
}
- if (runningTimeSum > 0) {
- logger.info(
- "Dispatcher exits, idle ratio: {}, running time: {}ms, idle time: {}ms, remaining threads: {}",
- idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum),
- runningTimeSum / 1_000_000L,
- idleTimeSum / 1_000_000L,
- group.getGroupThreadNum().decrementAndGet());
- }
}
protected void serializeEntries() throws InterruptedException {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index 92e7603036..a57a8a0774 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -45,6 +45,7 @@ public interface StableEntryManager {
HardState getHardState();
void updateMeta(long commitIndex, long applyIndex);
+
LogManagerMeta getMeta();
/**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 90ef700867..751e0ed510 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -71,14 +71,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private static final String LOG_DATA_FILE_SUFFIX = "data";
private static final String LOG_INDEX_FILE_SUFFIX = "idx";
- /**
- * the log data files
- */
+ /** the log data files */
private List<File> logDataFileList;
- /**
- * the log index files
- */
+ /** the log index files */
private List<IndexFileDescriptor> logIndexFileList;
private LogParser parser = LogParser.getINSTANCE();
@@ -88,14 +84,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private LogManagerMeta meta;
private HardState state;
- /**
- * min version of available log
- */
+ /** min version of available log */
private long minAvailableVersion = 0;
- /**
- * max version of available log
- */
+ /** max version of available log */
private long maxAvailableVersion = Long.MAX_VALUE;
private String logDir;
@@ -150,9 +142,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
- /**
- * the lock uses when change the log data files or log index files
- */
+ /** the lock uses when change the log data files or log index files */
private final Lock lock = new ReentrantLock();
private volatile boolean isClosed = false;
@@ -211,9 +201,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
}
- /**
- * for log tools
- */
+ /** for log tools */
@Override
public LogManagerMeta getMeta() {
return meta;
@@ -225,9 +213,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
meta.setLastAppliedIndex(applyIndex);
}
- /**
- * Recover all the logs in disk. This function will be called once this instance is created.
- */
+ /** Recover all the logs in disk. This function will be called once this instance is created. */
@Override
public List<Entry> getAllEntriesAfterAppliedIndex() {
logger.debug(
@@ -241,8 +227,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* is uncommitted for persistent LogManagerMeta(meta's info is stale).We need to recover these
* already persistent logs.
*
- * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually
- * been
+ * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually been
* flushed to 7,when we restart cluster,we need to recover 6 and 7.
*
* <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and getAllEntriesAfterCommittedIndex
@@ -393,11 +378,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
try {
checkStream();
// 1. write to the log data file
- byte[] compressed =
- compressor.compress(
- logDataBuffer.array(),
- 0,
- logDataBuffer.position());
+ byte[] compressed = compressor.compress(logDataBuffer.array(), 0, logDataBuffer.position());
ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
currentLogDataOutputStream.write(compressed);
logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
@@ -445,9 +426,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /**
- * flush the log buffer and check if the file needs to be closed
- */
+ /** flush the log buffer and check if the file needs to be closed */
@Override
public void forceFlushLogBuffer() {
lock.lock();
@@ -491,9 +470,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /**
- * The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data
- */
+ /** The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data */
private void recoverLogFiles() {
// 1. first we should recover the log index file
recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -540,9 +517,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* Check that the file is legal or not
*
- * @param file file needs to be check
- * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
- * {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+ * @param file file needs to be check
+ * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or {@link
+ * SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
* @return true if the file legal otherwise false
*/
private boolean checkLogFile(File file, String fileType) {
@@ -770,9 +747,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /**
- * for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version}
- */
+ /** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
lock.lock();
try {
@@ -1117,7 +1092,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return the raft log which index between [startIndex, endIndex] or empty if not found
*/
@Override
@@ -1222,9 +1197,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return first value-> the log data file, second value-> the left value is the start offset of
- * the file, the right is the end offset of the file
+ * the file, the right is the end offset of the file
*/
private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
long startIndex, long endIndex) {
@@ -1272,8 +1247,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
if (logDataFileWithStartAndEndLogIndex != null) {
// this means the endIndex's offset can not be found in the file
// logDataFileWithStartAndEndLogIndex.left
- long endOffset = getOffsetAccordingToLogIndex(
- Math.min(endIndex, logDataFileWithStartAndEndLogIndex.right.right));
+ long endOffset =
+ getOffsetAccordingToLogIndex(
+ Math.min(endIndex, logDataFileWithStartAndEndLogIndex.right.right));
fileNameWithStartAndEndOffset.add(
new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
logger.debug(
@@ -1290,8 +1266,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the start log index
* @return the first value of the pair is the log index file which contains the start index; the
- * second pair's first value is the file's start log index. the second pair's second value is the
- * file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second pair's second value is
+ * the file's end log index. null if not found
*/
public IndexFileDescriptor getLogIndexFile(long startIndex) {
for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1306,8 +1282,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the start log index
* @return the first value of the pair is the log data file which contains the start index; the
- * second pair's first value is the file's start log index. the second pair's second value is the
- * file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second pair's second value is
+ * the file's end log index. null if not found
*/
public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
for (File file : logDataFileList) {
@@ -1326,9 +1302,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
/**
- * @param file the log data file
+ * @param file the log data file
* @param startAndEndOffset the left value is the start offset of the file, the right is the end
- * offset of the file
+ * offset of the file
* @return the logs between start offset and end offset
*/
private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> startAndEndOffset) {
@@ -1421,9 +1397,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* automatically increased by saveInterval to avoid conflicts.
*/
private static long saveInterval = 100;
- /**
- * time partition id to dividing time series into different storage group
- */
+ /** time partition id to dividing time series into different storage group */
private long timePartitionId;
private long prevVersion;
@@ -1437,9 +1411,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
restore();
}
- /**
- * only used for upgrading
- */
+ /** only used for upgrading */
public SimpleFileVersionController(String directoryPath) throws IOException {
this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
restore();
@@ -1502,9 +1474,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
prevVersion = currVersion;
}
- /**
- * recovery from disk
- */
+ /** recovery from disk */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void restore() throws IOException {
File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);