You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/07/17 03:54:56 UTC
[iotdb] branch master updated: [IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently (#10537)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c8c545a8f0c [IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently (#10537)
c8c545a8f0c is described below
commit c8c545a8f0c146f707d0c2a0f5995f32df8693ed
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Mon Jul 17 11:54:51 2023 +0800
[IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently (#10537)
---
.../manager/pipe/task/PipeTaskCoordinator.java | 13 +++++-
.../confignode/procedure/ProcedureExecutor.java | 5 ++
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 53 +++++++++++++++++++---
3 files changed, 62 insertions(+), 9 deletions(-)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index b084185ac2b..5f0d8f68d17 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -73,14 +73,23 @@ public class PipeTaskCoordinator {
/**
* Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder,
* which means that the holder will be null after calling this method.
+ *
+ * @return true if successfully unlocked, false if current thread is not holding the lock.
*/
- public void unlock() {
+ public boolean unlock() {
if (pipeTaskInfoHolder != null) {
pipeTaskInfoHolder.set(null);
pipeTaskInfoHolder = null;
}
- pipeTaskCoordinatorLock.unlock();
+ try {
+ pipeTaskCoordinatorLock.unlock();
+ return true;
+ } catch (IllegalMonitorStateException ignored) {
+ // This is thrown if unlock() is called without lock() called first.
+ LOGGER.warn("This thread is not holding the lock.");
+ return false;
+ }
}
/* Caller should ensure that the method is called in the lock {@link #lock()}. */
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 4ea13fe88eb..d9175797a4e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -347,6 +347,10 @@ public class ProcedureExecutor<Env> {
switch (executeRootStackRollback(rootProcId, rootProcStack)) {
case LOCK_ACQUIRED:
break;
+ case LOCK_EVENT_WAIT:
+ LOG.info("LOCK_EVENT_WAIT rollback " + proc);
+ rootProcStack.unsetRollback();
+ break;
case LOCK_YIELD_WAIT:
rootProcStack.unsetRollback();
scheduler.yield(proc);
@@ -361,6 +365,7 @@ public class ProcedureExecutor<Env> {
break;
case LOCK_EVENT_WAIT:
LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc);
+ break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
break;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 57a8baa6503..02f62993fbf 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -68,6 +69,50 @@ public abstract class AbstractOperatePipeProcedureV2
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
+ @Override
+ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ if (configNodeProcedureEnv.getNodeLock().tryLock(this)) {
+ pipeTaskInfo =
+ configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .lock();
+ LOGGER.info("procedureId {} acquire lock.", getProcId());
+ return ProcedureLockState.LOCK_ACQUIRED;
+ }
+ configNodeProcedureEnv.getNodeLock().waitProcedure(this);
+ LOGGER.info("procedureId {} wait for lock.", getProcId());
+ return ProcedureLockState.LOCK_EVENT_WAIT;
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+
+ @Override
+ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ LOGGER.info("procedureId {} release lock.", getProcId());
+ if (pipeTaskInfo != null) {
+ configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .unlock();
+ }
+ if (configNodeProcedureEnv.getNodeLock().releaseLock(this)) {
+ configNodeProcedureEnv
+ .getNodeLock()
+ .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
+ }
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+
protected abstract PipeTaskOperation getOperation();
/**
@@ -103,7 +148,6 @@ public abstract class AbstractOperatePipeProcedureV2
try {
switch (state) {
case VALIDATE_TASK:
- pipeTaskInfo = env.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
executeFromValidateTask(env);
setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
break;
@@ -117,7 +161,6 @@ public abstract class AbstractOperatePipeProcedureV2
break;
case OPERATE_ON_DATA_NODES:
executeFromOperateOnDataNodes(env);
- env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException(
@@ -160,11 +203,7 @@ public abstract class AbstractOperatePipeProcedureV2
throws IOException, InterruptedException, ProcedureException {
switch (state) {
case VALIDATE_TASK:
- try {
- rollbackFromValidateTask(env);
- } finally {
- env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
- }
+ rollbackFromValidateTask(env);
break;
case CALCULATE_INFO_FOR_TASK:
rollbackFromCalculateInfoForTask(env);