You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/25 02:18:58 UTC
[iotdb] branch native_raft updated: fix DispatcherGroup initialization fix HeartbeatThread exit
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 242539ab42 fix DispatcherGroup initialization fix HeartbeatThread exit
242539ab42 is described below
commit 242539ab4263b8b25e632721318751883c8c81e0
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Apr 25 10:21:25 2023 +0800
fix DispatcherGroup initialization
fix HeartbeatThread exit
---
.../protocol/heartbeat/HeartbeatThread.java | 2 +-
.../protocol/log/applier/AsyncLogApplier.java | 2 +-
.../protocol/log/dispatch/DispatcherGroup.java | 33 +++++++++++-----------
.../protocol/log/dispatch/DispatcherThread.java | 2 +-
.../concurrent/dynamic/DynamicThreadGroup.java | 5 ++++
5 files changed, 25 insertions(+), 19 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index 4f2bdc07b5..b73b812449 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -240,7 +240,7 @@ public class HeartbeatThread implements Runnable {
}
// the election goes on until this node becomes a follower or a leader
- while (localMember.getRole() == RaftRole.CANDIDATE) {
+ while (localMember.getRole() == RaftRole.CANDIDATE && !Thread.interrupted()) {
startElection();
if (localMember.getRole() == RaftRole.CANDIDATE) {
// sleep random time to reduce election conflicts
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 4378e5b5f4..91f8b6e1e9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -175,7 +175,7 @@ public class AsyncLogApplier implements LogApplier {
return;
}
}
- logger.info("DataLogConsumer exits");
+ logger.debug("DataLogConsumer exits");
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 1fef54f95d..86e4f2cf43 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -19,18 +19,20 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+
import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
public class DispatcherGroup {
private static final Logger logger = LoggerFactory.getLogger(DispatcherGroup.class);
@@ -50,22 +52,21 @@ public class DispatcherGroup {
this.nodeEnabled = true;
this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
- this.dynamicThreadGroup = new DynamicThreadGroup(logDispatcher.member.getName() + "-" + peer,
- dispatcherThreadPool::submit, () -> newDispatcherThread(peer, entryQueue, rateLimiter), 1,
- maxBindingThreadNum);
+ this.dynamicThreadGroup =
+ new DynamicThreadGroup(
+ logDispatcher.member.getName() + "-" + peer,
+ dispatcherThreadPool::submit,
+ () -> newDispatcherThread(peer, entryQueue, rateLimiter),
+ maxBindingThreadNum / 4,
+ maxBindingThreadNum);
+ this.dynamicThreadGroup.init();
}
public void close() {
- dispatcherThreadPool.shutdownNow();
- boolean closeSucceeded = false;
try {
- closeSucceeded = dispatcherThreadPool.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // ignore
- }
- if (!closeSucceeded) {
- logger.warn(
- "Cannot shut down dispatcher pool of {}-{}", logDispatcher.member.getName(), peer);
+ dynamicThreadGroup.join();
+ } catch (ExecutionException | InterruptedException e) {
+ logger.error("Failed to stop threads in {}", dynamicThreadGroup);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 31f71707cc..a8cfc3d7e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -98,7 +98,7 @@ class DispatcherThread extends DynamicThread {
continue;
}
}
- idleToRunning();
+ idleToRunning();
if (logger.isDebugEnabled()) {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index bbbd62a7e4..3350e132b6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -107,4 +107,9 @@ public class DynamicThreadGroup {
future.get();
}
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}