You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/25 21:15:58 UTC

[GitHub] gianm closed pull request #6212: fix TaskQueue-HRTR deadlock

gianm closed pull request #6212: fix TaskQueue-HRTR deadlock
URL: https://github.com/apache/incubator-druid/pull/6212
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 36fc44a80c3..42f8d5c22e3 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -376,6 +376,7 @@ private boolean runTaskOnWorker(
       // on a worker - this avoids overflowing a worker with tasks
       long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
       long waitStart = System.currentTimeMillis();
+      boolean isTaskAssignmentTimedOut = false;
       synchronized (statusLock) {
         while (tasks.containsKey(taskId)
                && tasks.get(taskId).getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) {
@@ -383,29 +384,38 @@ private boolean runTaskOnWorker(
           if (remaining > 0) {
             statusLock.wait(remaining);
           } else {
-            log.makeAlert(
-                "Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!",
-                workerHost,
-                taskId,
-                config.getTaskAssignmentTimeout()
-            ).emit();
-            taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
-            return true;
+            isTaskAssignmentTimedOut = true;
+            break;
           }
         }
-        return true;
       }
+
+      if (isTaskAssignmentTimedOut) {
+        log.makeAlert(
+            "Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!",
+            workerHost,
+            taskId,
+            config.getTaskAssignmentTimeout()
+        ).emit();
+        taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
+      }
+
+      return true;
     } else {
       return false;
     }
   }
 
+  // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which results in TaskQueue.notifyStatus() being called
+  // because that is attached by TaskQueue to task result future. So, this method must not be called with "statusLock"
+  // held. See https://github.com/apache/incubator-druid/issues/6201
   private void taskComplete(
       HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
       WorkerHolder workerHolder,
       TaskStatus taskStatus
   )
   {
+    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread must not hold statusLock.");
     Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
     Preconditions.checkNotNull(taskStatus, "taskStatus");
     if (workerHolder != null) {
@@ -1170,6 +1180,7 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder
 
     HttpRemoteTaskRunnerWorkItem taskItem;
     boolean shouldShutdownTask = false;
+    boolean isTaskCompleted = false;
 
     synchronized (statusLock) {
       taskItem = tasks.get(taskId);
@@ -1293,7 +1304,7 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder
                     TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
                   }
 
-                  taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
+                  isTaskCompleted = true;
                 } else {
                   log.warn(
                       "Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.",
@@ -1327,6 +1338,10 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder
       }
     }
 
+    if (isTaskCompleted) {
+      taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
+    }
+
     if (shouldShutdownTask) {
       log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
       workerHolder.shutdownTask(taskId);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org