You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/25 02:18:58 UTC

[iotdb] branch native_raft updated: fix DispatcherGroup initialization fix HeartbeatThread exit

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 242539ab42 fix DispatcherGroup initialization fix HeartbeatThread exit
242539ab42 is described below

commit 242539ab4263b8b25e632721318751883c8c81e0
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Apr 25 10:21:25 2023 +0800

    fix DispatcherGroup initialization
    fix HeartbeatThread exit
---
 .../protocol/heartbeat/HeartbeatThread.java        |  2 +-
 .../protocol/log/applier/AsyncLogApplier.java      |  2 +-
 .../protocol/log/dispatch/DispatcherGroup.java     | 33 +++++++++++-----------
 .../protocol/log/dispatch/DispatcherThread.java    |  2 +-
 .../concurrent/dynamic/DynamicThreadGroup.java     |  5 ++++
 5 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index 4f2bdc07b5..b73b812449 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -240,7 +240,7 @@ public class HeartbeatThread implements Runnable {
     }
 
     // the election goes on until this node becomes a follower or a leader
-    while (localMember.getRole() == RaftRole.CANDIDATE) {
+    while (localMember.getRole() == RaftRole.CANDIDATE && !Thread.interrupted()) {
       startElection();
       if (localMember.getRole() == RaftRole.CANDIDATE) {
         // sleep random time to reduce election conflicts
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 4378e5b5f4..91f8b6e1e9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -175,7 +175,7 @@ public class AsyncLogApplier implements LogApplier {
           return;
         }
       }
-      logger.info("DataLogConsumer exits");
+      logger.debug("DataLogConsumer exits");
     }
 
     @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 1fef54f95d..86e4f2cf43 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -19,18 +19,20 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+
 import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
 public class DispatcherGroup {
 
   private static final Logger logger = LoggerFactory.getLogger(DispatcherGroup.class);
@@ -50,22 +52,21 @@ public class DispatcherGroup {
     this.nodeEnabled = true;
     this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
     this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
-    this.dynamicThreadGroup = new DynamicThreadGroup(logDispatcher.member.getName() + "-" + peer,
-        dispatcherThreadPool::submit, () -> newDispatcherThread(peer, entryQueue, rateLimiter), 1,
-        maxBindingThreadNum);
+    this.dynamicThreadGroup =
+        new DynamicThreadGroup(
+            logDispatcher.member.getName() + "-" + peer,
+            dispatcherThreadPool::submit,
+            () -> newDispatcherThread(peer, entryQueue, rateLimiter),
+            maxBindingThreadNum / 4,
+            maxBindingThreadNum);
+    this.dynamicThreadGroup.init();
   }
 
   public void close() {
-    dispatcherThreadPool.shutdownNow();
-    boolean closeSucceeded = false;
     try {
-      closeSucceeded = dispatcherThreadPool.awaitTermination(10, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      // ignore
-    }
-    if (!closeSucceeded) {
-      logger.warn(
-          "Cannot shut down dispatcher pool of {}-{}", logDispatcher.member.getName(), peer);
+      dynamicThreadGroup.join();
+    } catch (ExecutionException | InterruptedException e) {
+      logger.error("Failed to stop threads in {}", dynamicThreadGroup);
     }
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 31f71707cc..a8cfc3d7e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -98,7 +98,7 @@ class DispatcherThread extends DynamicThread {
             continue;
           }
         }
-       idleToRunning();
+        idleToRunning();
         if (logger.isDebugEnabled()) {
           logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
         }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index bbbd62a7e4..3350e132b6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -107,4 +107,9 @@ public class DynamicThreadGroup {
       future.get();
     }
   }
+
+  @Override
+  public String toString() {
+    return name;
+  }
 }