You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/08 07:15:55 UTC
[iotdb] branch master updated: fix the NPE when addPeer to a MultiLeader Group with 1 replic (#7927)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6abecabfad fix the NPE when addPeer to a MultiLeader Group with 1 replic (#7927)
6abecabfad is described below
commit 6abecabfad51257c44b70609e29f80f436a8eb57
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Tue Nov 8 15:15:49 2022 +0800
fix the NPE when addPeer to a MultiLeader Group with 1 replic (#7927)
---
.../multileader/logdispatcher/LogDispatcher.java | 24 ++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 427f2d3945..606731e6a7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -81,16 +81,20 @@ public class LogDispatcher {
.map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
.collect(Collectors.toList());
if (!threads.isEmpty()) {
- // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
- // And every LogDispatcherThread won't release its thread in this pool because it won't stop
- // unless LogDispatcher stop.
- // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
- this.executorService =
- IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-" + impl.getThisNode().getGroupId());
+ initLogSyncThreadPool();
}
}
+ private void initLogSyncThreadPool() {
+ // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
+ // And every LogDispatcherThread won't release its thread in this pool because it won't stop
+ // unless LogDispatcher stop.
+ // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
+ this.executorService =
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-" + impl.getThisNode().getGroupId());
+ }
+
public synchronized void start() {
if (!threads.isEmpty()) {
threads.forEach(executorService::submit);
@@ -118,9 +122,13 @@ public class LogDispatcher {
if (stopped) {
return;
}
- //
LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
threads.add(thread);
+ // If the initial replica is 1, the executorService won't be initialized. And when adding
+ // dispatcher thread, the executorService should be initialized manually
+ if (this.executorService == null) {
+ initLogSyncThreadPool();
+ }
executorService.submit(thread);
}