You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/26 12:44:50 UTC
[iotdb] 01/01: fix a deadlock
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 14bf452eed34ce5439dae728b568c2b0d7a6deb4
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Oct 26 20:44:38 2022 +0800
fix a deadlock
---
.../consensus/statemachine/PartitionRegionStateMachine.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 21dec5d497..1822cd5389 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -38,12 +39,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.util.concurrent.ExecutorService;
/** StateMachine for PartitionRegion */
public class PartitionRegionStateMachine
implements IStateMachine, IStateMachine.EventApi, IStateMachine.RetryPolicy {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
+
+ private static final ExecutorService threadPool =
+ IoTDBThreadPoolFactory.newCachedThreadPool("CQ-recovery");
private final ConfigPlanExecutor executor;
private ConfigManager configManager;
private final TEndPoint currentNodeTEndPoint;
@@ -161,7 +166,13 @@ public class PartitionRegionStateMachine
configManager.getNodeManager().startHeartbeatService();
configManager.getNodeManager().startUnknownDataNodeDetector();
configManager.getPartitionManager().startRegionCleaner();
- configManager.getCQManager().startCQScheduler();
+
+ // we do cq recovery async for two reasons:
+ // 1. For performance: cq recovery may be time-consuming, we use another thread to do it in
+ // make notifyLeaderChanged not blocked by it
+ // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
+ // initialized after notifyLeaderChanged finished
+ threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
} else {
LOGGER.info(
"Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",