You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/13 10:02:25 UTC

[iotdb] branch native_raft updated: add delayed dispatcher group

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 0ec0351658 add delayed dispatcher group
0ec0351658 is described below

commit 0ec035165862ae2afbc1c9cc8b6d4e873600e076
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Apr 13 18:02:11 2023 +0800

    add delayed dispatcher group
---
 .../natraft/protocol/log/dispatch/DispatcherGroup.java       | 10 ++++++++++
 .../natraft/protocol/log/dispatch/DispatcherThread.java      | 12 ++++++++++++
 .../natraft/protocol/log/dispatch/LogDispatcher.java         |  4 +++-
 .../protocol/log/dispatch/flowcontrol/FlowBalancer.java      |  4 ++--
 4 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index acc4f2bd2c..28ab02c7c4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -41,6 +41,7 @@ public class DispatcherGroup {
   private final LogDispatcher logDispatcher;
   private final AtomicInteger groupThreadNum = new AtomicInteger();
   private int maxBindingThreadNum;
+  private boolean delayed;
 
   public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int maxBindingThreadNum) {
     this.logDispatcher = logDispatcher;
@@ -85,6 +86,7 @@ public class DispatcherGroup {
 
   public void updateRate(double rate) {
     rateLimiter.setRate(rate);
+    delayed = rate != Double.MAX_VALUE;
   }
 
   ExecutorService createPool(Peer node, String name) {
@@ -118,4 +120,12 @@ public class DispatcherGroup {
   public int getMaxBindingThreadNum() {
     return maxBindingThreadNum;
   }
+
+  public boolean isDelayed() {
+    return delayed;
+  }
+
+  public void setDelayed(boolean delayed) {
+    this.delayed = delayed;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 298b87eda3..7173249696 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -52,6 +52,7 @@ class DispatcherThread implements Runnable {
   private final DispatcherGroup group;
   private long idleTimeSum;
   private long runningTimeSum;
+  private long lastDispatchTime;
 
   protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
       BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
@@ -73,6 +74,16 @@ class DispatcherThread implements Runnable {
       long idleStart = System.nanoTime();
       long runningStart = 0;
       while (!Thread.interrupted()) {
+        if (group.isDelayed()) {
+          if (logBlockingDeque.size() < logDispatcher.maxBatchSize &&
+          System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
+            // the follower is being delayed, if there is not enough requests, and it has
+            // dispatched recently, wait for a while to get a larger batch
+            Thread.sleep(100);
+            continue;
+          }
+        }
+
         synchronized (logBlockingDeque) {
           VotingEntry poll = logBlockingDeque.poll();
           if (poll != null) {
@@ -98,6 +109,7 @@ class DispatcherThread implements Runnable {
         currBatch.clear();
 
         currTime = System.nanoTime();
+        lastDispatchTime = currTime;
         runningTimeSum = currTime - runningStart;
         idleStart = currTime;
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 5ab499da20..e3a6459104 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -75,7 +75,9 @@ public class LogDispatcher {
   public void updateRateLimiter() {
     logger.info("TEndPoint rates: {}", nodesRate);
     for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) {
-      dispatcherGroupMap.get(nodeDoubleEntry.getKey()).updateRate(nodeDoubleEntry.getValue());
+      Peer peer = nodeDoubleEntry.getKey();
+      Double rate = nodeDoubleEntry.getValue();
+      dispatcherGroupMap.get(peer).updateRate(rate);
     }
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 27ffe6b86d..36b692b76a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -109,7 +109,7 @@ public class FlowBalancer {
     int i = 0;
     for (; i < quorumFollowerNum; i++) {
       Peer node = followers.get(i);
-      nodesRate.put(node, maxFlow);
+      nodesRate.put(node, Double.MAX_VALUE);
       remainingFlow -= flowToQuorum;
     }
     double flowToRemaining = remainingFlow / (followerNum - quorumFollowerNum);
@@ -126,7 +126,7 @@ public class FlowBalancer {
     // lift flow limits
     for (int i = 0; i < followerNum; i++) {
       Peer node = followers.get(i);
-      nodesRate.put(node, maxFlow);
+      nodesRate.put(node, Double.MAX_VALUE);
     }
   }
 }