You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/26 12:44:50 UTC

[iotdb] 01/01: fix a deadlock

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 14bf452eed34ce5439dae728b568c2b0d7a6deb4
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Oct 26 20:44:38 2022 +0800

    fix a deadlock
---
 .../consensus/statemachine/PartitionRegionStateMachine.java | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 21dec5d497..1822cd5389 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.consensus.statemachine;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -38,12 +39,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.concurrent.ExecutorService;
 
 /** StateMachine for PartitionRegion */
 public class PartitionRegionStateMachine
     implements IStateMachine, IStateMachine.EventApi, IStateMachine.RetryPolicy {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
+
+  private static final ExecutorService threadPool =
+      IoTDBThreadPoolFactory.newCachedThreadPool("CQ-recovery");
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
   private final TEndPoint currentNodeTEndPoint;
@@ -161,7 +166,13 @@ public class PartitionRegionStateMachine
       configManager.getNodeManager().startHeartbeatService();
       configManager.getNodeManager().startUnknownDataNodeDetector();
       configManager.getPartitionManager().startRegionCleaner();
-      configManager.getCQManager().startCQScheduler();
+
+      // we do cq recovery async for two reasons:
+      // 1. For performance: cq recovery may be time-consuming, we use another thread to do it in
+      // make notifyLeaderChanged not blocked by it
+      // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
+      // initialized after notifyLeaderChanged finished
+      threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
     } else {
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",