You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:55 UTC
[iotdb] 05/13: Fix deadlock caused by submitting task which has dependency lazily
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cf097a1bb62c06415316ec943fe58e711f48d7a1
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 9 21:51:29 2023 +0800
Fix deadlock caused by submitting task which has dependency lazily
---
.../db/mpp/execution/schedule/DriverScheduler.java | 22 ++++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index f959007648..ef191ec5cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -195,8 +195,16 @@ public class DriverScheduler implements IDriverScheduler, IService {
tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
blockedDependencyFuture.addListener(
() -> {
- registerTaskToQueryMap(queryId, task);
- submitTaskToReadyQueue(task);
+ // Only if query is alive, we can submit this task
+ Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId);
+ if (queryRelatedTasks != null) {
+ Set<DriverTask> instanceRelatedTasks =
+ queryRelatedTasks.get(task.getDriverTaskId().getFragmentInstanceId());
+ if (instanceRelatedTasks != null) {
+ instanceRelatedTasks.add(task);
+ submitTaskToReadyQueue(task);
+ }
+ }
},
MoreExecutors.directExecutor());
} else {
@@ -473,8 +481,14 @@ public class DriverScheduler implements IDriverScheduler, IService {
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.FINISHED);
+ } finally {
+ task.unlock();
+ }
+ // Dependency driver must be submitted before this task is cleared
+ task.submitDependencyDriver();
+ task.lock();
+ try {
clearDriverTask(task);
- task.submitDependencyDriver();
} finally {
task.unlock();
}
@@ -499,7 +513,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
task.unlock();
}
QueryId queryId = task.getDriverTaskId().getQueryId();
- Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId);
+ Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.remove(queryId);
if (queryRelatedTasks != null) {
for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
if (fragmentRelatedTasks != null) {