You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/13 09:48:58 UTC
[iotdb] branch native_raft updated: add dynamic dispatcher number
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 1d8b3c0b11 add dynamic dispatcher number
1d8b3c0b11 is described below
commit 1d8b3c0b1104b818dde6100f553d94181bfc35fe
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Apr 13 17:48:44 2023 +0800
add dynamic dispatcher number
---
.../protocol/log/dispatch/DispatcherGroup.java | 20 +++++++++---
.../protocol/log/dispatch/DispatcherThread.java | 37 +++++++++++++++++++++-
2 files changed, 51 insertions(+), 6 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 0104695c23..acc4f2bd2c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -40,15 +40,17 @@ public class DispatcherGroup {
private final ExecutorService dispatcherThreadPool;
private final LogDispatcher logDispatcher;
private final AtomicInteger groupThreadNum = new AtomicInteger();
+ private int maxBindingThreadNum;
- public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int bindingThreadNum) {
+ public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int maxBindingThreadNum) {
this.logDispatcher = logDispatcher;
this.peer = peer;
this.entryQueue = new ArrayBlockingQueue<>(logDispatcher.getConfig().getMaxNumOfLogsInMem());
this.nodeEnabled = true;
this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
+ this.maxBindingThreadNum = maxBindingThreadNum;
this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
- for (int i = 0; i < bindingThreadNum; i++) {
+ for (int i = 0; i < maxBindingThreadNum; i++) {
addThread();
}
}
@@ -67,9 +69,13 @@ public class DispatcherGroup {
}
}
public void addThread() {
- dispatcherThreadPool
- .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
- groupThreadNum.incrementAndGet();
+ int threadNum = groupThreadNum.incrementAndGet();
+ if (threadNum <= maxBindingThreadNum) {
+ dispatcherThreadPool
+ .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
+ } else {
+ groupThreadNum.decrementAndGet();
+ }
}
DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue,
@@ -108,4 +114,8 @@ public class DispatcherGroup {
public AtomicInteger getGroupThreadNum() {
return groupThreadNum;
}
+
+ public int getMaxBindingThreadNum() {
+ return maxBindingThreadNum;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 8808479b45..298b87eda3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -50,6 +50,8 @@ class DispatcherThread implements Runnable {
private final String baseName;
private final RateLimiter rateLimiter;
private final DispatcherGroup group;
+ private long idleTimeSum;
+ private long runningTimeSum;
protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
@@ -68,6 +70,8 @@ class DispatcherThread implements Runnable {
Thread.currentThread().setName(baseName);
}
try {
+ long idleStart = System.nanoTime();
+ long runningStart = 0;
while (!Thread.interrupted()) {
synchronized (logBlockingDeque) {
VotingEntry poll = logBlockingDeque.poll();
@@ -79,25 +83,56 @@ class DispatcherThread implements Runnable {
continue;
}
}
+ long currTime = System.nanoTime();
+ idleTimeSum = currTime - idleStart;
+ runningStart = currTime;
if (logger.isDebugEnabled()) {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
}
+
serializeEntries();
if (!logDispatcher.queueOrdered) {
currBatch.sort(Comparator.comparingLong(s -> s.getEntry().getCurrLogIndex()));
}
sendLogs(currBatch);
currBatch.clear();
+
+ currTime = System.nanoTime();
+ runningTimeSum = currTime - runningStart;
+ idleStart = currTime;
+
+ // thread too idle
+ if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.5 &&
+ runningTimeSum > 10_000_000_000L) {
+ int remaining = group.getGroupThreadNum().decrementAndGet();
+ if (remaining > 1) {
+ logger.info("Dispatcher thread too idle");
+ break;
+ } else {
+ group.getGroupThreadNum().incrementAndGet();
+ }
+ // thread too busy
+ } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5 &&
+ runningTimeSum > 10_000_000_000L) {
+ int groupThreadNum = group.getGroupThreadNum().get();
+ if (groupThreadNum < group.getMaxBindingThreadNum()) {
+ group.addThread();
+ }
+ // avoid frequent change
+ runningTimeSum = 0;
+ idleTimeSum = 0;
+ }
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Unexpected error in log dispatcher", e);
}
- logger.info("Dispatcher exits");
+ logger.info("Dispatcher exits, idle ratio: {}", idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum));
group.getGroupThreadNum().decrementAndGet();
}
+
protected void serializeEntries() throws InterruptedException {
for (VotingEntry request : currBatch) {