You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/13 09:48:58 UTC

[iotdb] branch native_raft updated: add dynamic dispatcher number

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 1d8b3c0b11 add dynamic dispatcher number
1d8b3c0b11 is described below

commit 1d8b3c0b1104b818dde6100f553d94181bfc35fe
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Apr 13 17:48:44 2023 +0800

    add dynamic dispatcher number
---
 .../protocol/log/dispatch/DispatcherGroup.java     | 20 +++++++++---
 .../protocol/log/dispatch/DispatcherThread.java    | 37 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 0104695c23..acc4f2bd2c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -40,15 +40,17 @@ public class DispatcherGroup {
   private final ExecutorService dispatcherThreadPool;
   private final LogDispatcher logDispatcher;
   private final AtomicInteger groupThreadNum = new AtomicInteger();
+  private int maxBindingThreadNum;
 
-  public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int bindingThreadNum) {
+  public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int maxBindingThreadNum) {
     this.logDispatcher = logDispatcher;
     this.peer = peer;
     this.entryQueue = new ArrayBlockingQueue<>(logDispatcher.getConfig().getMaxNumOfLogsInMem());
     this.nodeEnabled = true;
     this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
+    this.maxBindingThreadNum = maxBindingThreadNum;
     this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
-    for (int i = 0; i < bindingThreadNum; i++) {
+    for (int i = 0; i < maxBindingThreadNum; i++) {
       addThread();
     }
   }
@@ -67,9 +69,13 @@ public class DispatcherGroup {
     }
   }
   public void addThread() {
-    dispatcherThreadPool
-        .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
-    groupThreadNum.incrementAndGet();
+    int threadNum = groupThreadNum.incrementAndGet();
+    if (threadNum <= maxBindingThreadNum) {
+      dispatcherThreadPool
+          .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
+    } else {
+      groupThreadNum.decrementAndGet();
+    }
   }
 
   DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue,
@@ -108,4 +114,8 @@ public class DispatcherGroup {
   public AtomicInteger getGroupThreadNum() {
     return groupThreadNum;
   }
+
+  public int getMaxBindingThreadNum() {
+    return maxBindingThreadNum;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 8808479b45..298b87eda3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -50,6 +50,8 @@ class DispatcherThread implements Runnable {
   private final String baseName;
   private final RateLimiter rateLimiter;
   private final DispatcherGroup group;
+  private long idleTimeSum;
+  private long runningTimeSum;
 
   protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
       BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
@@ -68,6 +70,8 @@ class DispatcherThread implements Runnable {
       Thread.currentThread().setName(baseName);
     }
     try {
+      long idleStart = System.nanoTime();
+      long runningStart = 0;
       while (!Thread.interrupted()) {
         synchronized (logBlockingDeque) {
           VotingEntry poll = logBlockingDeque.poll();
@@ -79,25 +83,56 @@ class DispatcherThread implements Runnable {
             continue;
           }
         }
+        long currTime = System.nanoTime();
+        idleTimeSum = currTime - idleStart;
+        runningStart = currTime;
         if (logger.isDebugEnabled()) {
           logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
         }
+
         serializeEntries();
         if (!logDispatcher.queueOrdered) {
           currBatch.sort(Comparator.comparingLong(s -> s.getEntry().getCurrLogIndex()));
         }
         sendLogs(currBatch);
         currBatch.clear();
+
+        currTime = System.nanoTime();
+        runningTimeSum = currTime - runningStart;
+        idleStart = currTime;
+
+        // thread too idle
+        if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.5 &&
+        runningTimeSum > 10_000_000_000L) {
+          int remaining = group.getGroupThreadNum().decrementAndGet();
+          if (remaining > 1) {
+            logger.info("Dispatcher thread too idle");
+            break;
+          } else {
+            group.getGroupThreadNum().incrementAndGet();
+          }
+          // thread too busy
+        } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5 &&
+            runningTimeSum > 10_000_000_000L) {
+          int groupThreadNum = group.getGroupThreadNum().get();
+          if (groupThreadNum < group.getMaxBindingThreadNum()) {
+            group.addThread();
+          }
+          // avoid frequent change
+          runningTimeSum = 0;
+          idleTimeSum = 0;
+        }
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     } catch (Exception e) {
       logger.error("Unexpected error in log dispatcher", e);
     }
-    logger.info("Dispatcher exits");
+    logger.info("Dispatcher exits, idle ratio: {}", idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum));
     group.getGroupThreadNum().decrementAndGet();
   }
 
+
   protected void serializeEntries() throws InterruptedException {
     for (VotingEntry request : currBatch) {