You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/08 02:47:51 UTC

[iotdb] 01/01: fix the NPE when addPeer to a MultiLeader Group with 1 replic

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_addpeer_ml_1_replic
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1326bf47b87da39b0b1b24c0b54d83716ac978bc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Nov 7 16:50:18 2022 +0800

    fix the NPE when addPeer to a MultiLeader Group with 1 replic
---
 .../multileader/logdispatcher/LogDispatcher.java   | 24 ++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 427f2d3945..606731e6a7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -81,16 +81,20 @@ public class LogDispatcher {
             .map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
             .collect(Collectors.toList());
     if (!threads.isEmpty()) {
-      // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
-      // And every LogDispatcherThread won't release its thread in this pool because it won't stop
-      // unless LogDispatcher stop.
-      // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
-      this.executorService =
-          IoTDBThreadPoolFactory.newCachedThreadPool(
-              "LogDispatcher-" + impl.getThisNode().getGroupId());
+      initLogSyncThreadPool();
     }
   }
 
+  private void initLogSyncThreadPool() {
+    // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
+    // And every LogDispatcherThread won't release its thread in this pool because it won't stop
+    // unless LogDispatcher stop.
+    // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
+    this.executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPool(
+            "LogDispatcher-" + impl.getThisNode().getGroupId());
+  }
+
   public synchronized void start() {
     if (!threads.isEmpty()) {
       threads.forEach(executorService::submit);
@@ -118,9 +122,13 @@ public class LogDispatcher {
     if (stopped) {
       return;
     }
-    //
     LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
     threads.add(thread);
+    // If the initial replica is 1, the executorService won't be initialized. And when adding
+    // dispatcher thread, the executorService should be initialized manually
+    if (this.executorService == null) {
+      initLogSyncThreadPool();
+    }
     executorService.submit(thread);
   }