You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:55 UTC

[iotdb] 05/13: Fix deadlock caused by submitting task which has dependency lazily

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cf097a1bb62c06415316ec943fe58e711f48d7a1
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 9 21:51:29 2023 +0800

    Fix deadlock caused by submitting task which has dependency lazily
---
 .../db/mpp/execution/schedule/DriverScheduler.java | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index f959007648..ef191ec5cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -195,8 +195,16 @@ public class DriverScheduler implements IDriverScheduler, IService {
             tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
         blockedDependencyFuture.addListener(
             () -> {
-              registerTaskToQueryMap(queryId, task);
-              submitTaskToReadyQueue(task);
+              // Only if query is alive, we can submit this task
+              Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId);
+              if (queryRelatedTasks != null) {
+                Set<DriverTask> instanceRelatedTasks =
+                    queryRelatedTasks.get(task.getDriverTaskId().getFragmentInstanceId());
+                if (instanceRelatedTasks != null) {
+                  instanceRelatedTasks.add(task);
+                  submitTaskToReadyQueue(task);
+                }
+              }
             },
             MoreExecutors.directExecutor());
       } else {
@@ -473,8 +481,14 @@ public class DriverScheduler implements IDriverScheduler, IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.FINISHED);
+      } finally {
+        task.unlock();
+      }
+      // Dependency driver must be submitted before this task is cleared
+      task.submitDependencyDriver();
+      task.lock();
+      try {
         clearDriverTask(task);
-        task.submitDependencyDriver();
       } finally {
         task.unlock();
       }
@@ -499,7 +513,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
           task.unlock();
         }
         QueryId queryId = task.getDriverTaskId().getQueryId();
-        Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId);
+        Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.remove(queryId);
         if (queryRelatedTasks != null) {
           for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
             if (fragmentRelatedTasks != null) {