You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/07/17 03:54:56 UTC

[iotdb] branch master updated: [IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently (#10537)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c545a8f0c [IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently (#10537)
c8c545a8f0c is described below

commit c8c545a8f0c146f707d0c2a0f5995f32df8693ed
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Mon Jul 17 11:54:51 2023 +0800

    [IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently (#10537)
---
 .../manager/pipe/task/PipeTaskCoordinator.java     | 13 +++++-
 .../confignode/procedure/ProcedureExecutor.java    |  5 ++
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 53 +++++++++++++++++++---
 3 files changed, 62 insertions(+), 9 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index b084185ac2b..5f0d8f68d17 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -73,14 +73,23 @@ public class PipeTaskCoordinator {
   /**
    * Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder,
    * which means that the holder will be null after calling this method.
+   *
+   * @return true if successfully unlocked, false if current thread is not holding the lock.
    */
-  public void unlock() {
+  public boolean unlock() {
     if (pipeTaskInfoHolder != null) {
       pipeTaskInfoHolder.set(null);
       pipeTaskInfoHolder = null;
     }
 
-    pipeTaskCoordinatorLock.unlock();
+    try {
+      pipeTaskCoordinatorLock.unlock();
+      return true;
+    } catch (IllegalMonitorStateException ignored) {
+      // This is thrown if unlock() is called without lock() called first.
+      LOGGER.warn("This thread is not holding the lock.");
+      return false;
+    }
   }
 
   /* Caller should ensure that the method is called in the lock {@link #lock()}. */
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 4ea13fe88eb..d9175797a4e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -347,6 +347,10 @@ public class ProcedureExecutor<Env> {
           switch (executeRootStackRollback(rootProcId, rootProcStack)) {
             case LOCK_ACQUIRED:
               break;
+            case LOCK_EVENT_WAIT:
+              LOG.info("LOCK_EVENT_WAIT rollback " + proc);
+              rootProcStack.unsetRollback();
+              break;
             case LOCK_YIELD_WAIT:
               rootProcStack.unsetRollback();
               scheduler.yield(proc);
@@ -361,6 +365,7 @@ public class ProcedureExecutor<Env> {
                 break;
               case LOCK_EVENT_WAIT:
                 LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc);
+                break;
               case LOCK_YIELD_WAIT:
                 scheduler.yield(proc);
                 break;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 57a8baa6503..02f62993fbf 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -68,6 +69,50 @@ public abstract class AbstractOperatePipeProcedureV2
   // putting it here is just for convenience
   protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
 
+  @Override
+  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      if (configNodeProcedureEnv.getNodeLock().tryLock(this)) {
+        pipeTaskInfo =
+            configNodeProcedureEnv
+                .getConfigManager()
+                .getPipeManager()
+                .getPipeTaskCoordinator()
+                .lock();
+        LOGGER.info("procedureId {} acquire lock.", getProcId());
+        return ProcedureLockState.LOCK_ACQUIRED;
+      }
+      configNodeProcedureEnv.getNodeLock().waitProcedure(this);
+      LOGGER.info("procedureId {} wait for lock.", getProcId());
+      return ProcedureLockState.LOCK_EVENT_WAIT;
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+
+  @Override
+  protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      LOGGER.info("procedureId {} release lock.", getProcId());
+      if (pipeTaskInfo != null) {
+        configNodeProcedureEnv
+            .getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .unlock();
+      }
+      if (configNodeProcedureEnv.getNodeLock().releaseLock(this)) {
+        configNodeProcedureEnv
+            .getNodeLock()
+            .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
+      }
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+
   protected abstract PipeTaskOperation getOperation();
 
   /**
@@ -103,7 +148,6 @@ public abstract class AbstractOperatePipeProcedureV2
     try {
       switch (state) {
         case VALIDATE_TASK:
-          pipeTaskInfo = env.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
           executeFromValidateTask(env);
           setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
           break;
@@ -117,7 +161,6 @@ public abstract class AbstractOperatePipeProcedureV2
           break;
         case OPERATE_ON_DATA_NODES:
           executeFromOperateOnDataNodes(env);
-          env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
           return Flow.NO_MORE_STATE;
         default:
           throw new UnsupportedOperationException(
@@ -160,11 +203,7 @@ public abstract class AbstractOperatePipeProcedureV2
       throws IOException, InterruptedException, ProcedureException {
     switch (state) {
       case VALIDATE_TASK:
-        try {
-          rollbackFromValidateTask(env);
-        } finally {
-          env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
-        }
+        rollbackFromValidateTask(env);
         break;
       case CALCULATE_INFO_FOR_TASK:
         rollbackFromCalculateInfoForTask(env);