You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/08 02:47:51 UTC
[iotdb] 01/01: fix the NPE when addPeer to a MultiLeader Group with 1 replic
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/fix_addpeer_ml_1_replic
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1326bf47b87da39b0b1b24c0b54d83716ac978bc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Nov 7 16:50:18 2022 +0800
fix the NPE when addPeer to a MultiLeader Group with 1 replic
---
.../multileader/logdispatcher/LogDispatcher.java | 24 ++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 427f2d3945..606731e6a7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -81,16 +81,20 @@ public class LogDispatcher {
.map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
.collect(Collectors.toList());
if (!threads.isEmpty()) {
- // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
- // And every LogDispatcherThread won't release its thread in this pool because it won't stop
- // unless LogDispatcher stop.
- // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
- this.executorService =
- IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-" + impl.getThisNode().getGroupId());
+ initLogSyncThreadPool();
}
}
+ private void initLogSyncThreadPool() {
+ // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
+ // And every LogDispatcherThread won't release its thread in this pool because it won't stop
+ // unless LogDispatcher stop.
+ // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
+ this.executorService =
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-" + impl.getThisNode().getGroupId());
+ }
+
public synchronized void start() {
if (!threads.isEmpty()) {
threads.forEach(executorService::submit);
@@ -118,9 +122,13 @@ public class LogDispatcher {
if (stopped) {
return;
}
- //
LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
threads.add(thread);
+ // If the initial replica is 1, the executorService won't be initialized. And when adding
+ // dispatcher thread, the executorService should be initialized manually
+ if (this.executorService == null) {
+ initLogSyncThreadPool();
+ }
executorService.submit(thread);
}