You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/13 10:02:25 UTC
[iotdb] branch native_raft updated: add delayed dispatcher group
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 0ec0351658 add delayed dispatcher group
0ec0351658 is described below
commit 0ec035165862ae2afbc1c9cc8b6d4e873600e076
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Apr 13 18:02:11 2023 +0800
add delayed dispatcher group
---
.../natraft/protocol/log/dispatch/DispatcherGroup.java | 10 ++++++++++
.../natraft/protocol/log/dispatch/DispatcherThread.java | 12 ++++++++++++
.../natraft/protocol/log/dispatch/LogDispatcher.java | 4 +++-
.../protocol/log/dispatch/flowcontrol/FlowBalancer.java | 4 ++--
4 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index acc4f2bd2c..28ab02c7c4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -41,6 +41,7 @@ public class DispatcherGroup {
private final LogDispatcher logDispatcher;
private final AtomicInteger groupThreadNum = new AtomicInteger();
private int maxBindingThreadNum;
+ private boolean delayed;
public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int maxBindingThreadNum) {
this.logDispatcher = logDispatcher;
@@ -85,6 +86,7 @@ public class DispatcherGroup {
public void updateRate(double rate) {
rateLimiter.setRate(rate);
+ delayed = rate != Double.MAX_VALUE;
}
ExecutorService createPool(Peer node, String name) {
@@ -118,4 +120,12 @@ public class DispatcherGroup {
public int getMaxBindingThreadNum() {
return maxBindingThreadNum;
}
+
+ public boolean isDelayed() {
+ return delayed;
+ }
+
+ public void setDelayed(boolean delayed) {
+ this.delayed = delayed;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 298b87eda3..7173249696 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -52,6 +52,7 @@ class DispatcherThread implements Runnable {
private final DispatcherGroup group;
private long idleTimeSum;
private long runningTimeSum;
+ private long lastDispatchTime;
protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
@@ -73,6 +74,16 @@ class DispatcherThread implements Runnable {
long idleStart = System.nanoTime();
long runningStart = 0;
while (!Thread.interrupted()) {
+ if (group.isDelayed()) {
+ if (logBlockingDeque.size() < logDispatcher.maxBatchSize &&
+ System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
+ // the follower is being delayed, if there is not enough requests, and it has
+ // dispatched recently, wait for a while to get a larger batch
+ Thread.sleep(100);
+ continue;
+ }
+ }
+
synchronized (logBlockingDeque) {
VotingEntry poll = logBlockingDeque.poll();
if (poll != null) {
@@ -98,6 +109,7 @@ class DispatcherThread implements Runnable {
currBatch.clear();
currTime = System.nanoTime();
+ lastDispatchTime = currTime;
runningTimeSum = currTime - runningStart;
idleStart = currTime;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 5ab499da20..e3a6459104 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -75,7 +75,9 @@ public class LogDispatcher {
public void updateRateLimiter() {
logger.info("TEndPoint rates: {}", nodesRate);
for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) {
- dispatcherGroupMap.get(nodeDoubleEntry.getKey()).updateRate(nodeDoubleEntry.getValue());
+ Peer peer = nodeDoubleEntry.getKey();
+ Double rate = nodeDoubleEntry.getValue();
+ dispatcherGroupMap.get(peer).updateRate(rate);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 27ffe6b86d..36b692b76a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -109,7 +109,7 @@ public class FlowBalancer {
int i = 0;
for (; i < quorumFollowerNum; i++) {
Peer node = followers.get(i);
- nodesRate.put(node, maxFlow);
+ nodesRate.put(node, Double.MAX_VALUE);
remainingFlow -= flowToQuorum;
}
double flowToRemaining = remainingFlow / (followerNum - quorumFollowerNum);
@@ -126,7 +126,7 @@ public class FlowBalancer {
// lift flow limits
for (int i = 0; i < followerNum; i++) {
Peer node = followers.get(i);
- nodesRate.put(node, maxFlow);
+ nodesRate.put(node, Double.MAX_VALUE);
}
}
}