You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/16 02:01:58 UTC

[GitHub] asdf2014 closed pull request #6621: Enforce logging when killing a task

asdf2014 closed pull request #6621: Enforce logging when killing a task
URL: https://github.com/apache/incubator-druid/pull/6621
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 2aa05e39ee4..260caf0fcf6 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -346,7 +346,7 @@ void checkSegmentsAndSubmitTasks()
           && !toBuildInterval.get(interval).equals(runningVersion.get(interval))
       ) {
         if (taskMaster.getTaskQueue().isPresent()) {
-          taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId());
+          taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId(), "version mismatch");
           runningTasks.remove(interval);
         }
       }
@@ -451,7 +451,7 @@ private void clearTasks()
   {
     for (HadoopIndexTask task : runningTasks.values()) {
       if (taskMaster.getTaskQueue().isPresent()) {
-        taskMaster.getTaskQueue().get().shutdown(task.getId());
+        taskMaster.getTaskQueue().get().shutdown(task.getId(), "killing all tasks");
       }
     }
     runningTasks.clear();
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6609b2412c0..eac54be496b 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -747,7 +747,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
       // Reset everything
       boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
       log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
-      taskGroups.values().forEach(this::killTasksInGroup);
+      taskGroups.values().forEach(group -> killTasksInGroup(group, "DataSourceMetadata is not found while reset"));
       taskGroups.clear();
       partitionGroups.clear();
     } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
@@ -811,7 +811,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
         if (metadataUpdateSuccess) {
           resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
             final int groupId = getTaskGroupIdForPartition(partition);
-            killTaskGroupForPartitions(ImmutableSet.of(partition));
+            killTaskGroupForPartitions(ImmutableSet.of(partition), "DataSourceMetadata is updated while reset");
             taskGroups.remove(groupId);
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET);
           });
@@ -828,19 +828,18 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
     }
   }
 
-  private void killTaskGroupForPartitions(Set<Integer> partitions)
+  private void killTaskGroupForPartitions(Set<Integer> partitions, String reasonFormat, Object... args)
   {
     for (Integer partition : partitions) {
-      killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)));
+      killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)), reasonFormat, args);
     }
   }
 
-  private void killTasksInGroup(TaskGroup taskGroup)
+  private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat, Object... args)
   {
     if (taskGroup != null) {
       for (String taskId : taskGroup.tasks.keySet()) {
-        log.info("Killing task [%s] in the task group", taskId);
-        killTask(taskId);
+        killTask(taskId, reasonFormat, args);
       }
     }
   }
@@ -855,7 +854,7 @@ void gracefulShutdownInternal() throws ExecutionException, InterruptedException,
     for (TaskGroup taskGroup : taskGroups.values()) {
       for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
         if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
-          killTask(entry.getKey());
+          killTask(entry.getKey(), "Killing task for graceful shutdown");
         } else {
           entry.getValue().startTime = DateTimes.EPOCH;
         }
@@ -1236,8 +1235,7 @@ public Boolean apply(KafkaIndexTask.Status status)
     for (int i = 0; i < results.size(); i++) {
       if (results.get(i) == null) {
         String taskId = futureTaskIds.get(i);
-        log.warn("Task [%s] failed to return status, killing task", taskId);
-        killTask(taskId);
+        killTask(taskId, "Task [%s] failed to return status, killing task", taskId);
       }
     }
     log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);
@@ -1297,7 +1295,7 @@ private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
           }
           catch (Exception e) {
             log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId);
-            killTask(taskId);
+            killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass());
             taskGroup.tasks.remove(taskId);
           }
         } else if (checkpoints.isEmpty()) {
@@ -1393,7 +1391,8 @@ private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
 
     taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(
         sequenceCheckpoint -> {
-          log.warn(
+          killTask(
+              sequenceCheckpoint.lhs,
               "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest "
               + "persisted offsets in metadata store [%s]",
               sequenceCheckpoint.lhs,
@@ -1401,7 +1400,6 @@ private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
               taskGroup.sequenceOffsets,
               latestOffsetsFromDb
           );
-          killTask(sequenceCheckpoint.lhs);
           taskGroup.tasks.remove(sequenceCheckpoint.lhs);
         }
     );
@@ -1505,8 +1503,7 @@ public Boolean apply(@Nullable DateTime startTime)
       // request threw an exception so kill the task
       if (results.get(i) == null) {
         String taskId = futureTaskIds.get(i);
-        log.warn("Task [%s] failed to return start time, killing task", taskId);
-        killTask(taskId);
+        killTask(taskId, "Task [%s] failed to return start time, killing task", taskId);
       }
     }
   }
@@ -1553,13 +1550,12 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
           partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
         }
       } else {
-        log.warn(
-            "All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]",
-            groupId,
-            group.taskIds()
-        );
         for (String id : group.taskIds()) {
-          killTask(id);
+          killTask(
+              id,
+              "All tasks in group [%s] failed to transition to publishing state",
+              groupId
+          );
         }
         // clear partitionGroups, so that latest offsets from db is used as start offsets not the stale ones
         // if tasks did some successful incremental handoffs
@@ -1589,7 +1585,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
             // metadata store (which will have advanced if we succeeded in publishing and will remain the same if
             // publishing failed and we need to re-ingest)
             return Futures.transform(
-                stopTasksInGroup(taskGroup),
+                stopTasksInGroup(taskGroup, "task[%s] succeeded in the taskGroup", task.status.getId()),
                 new Function<Object, Map<Integer, Long>>()
                 {
                   @Nullable
@@ -1604,8 +1600,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
 
           if (task.status.isRunnable()) {
             if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
-              log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
-              killTask(taskId);
+              killTask(taskId, "Killing task [%s] which hasn't been assigned to a worker", taskId);
               i.remove();
             }
           }
@@ -1634,8 +1629,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
 
               if (result == null || result.isEmpty()) { // kill tasks that didn't return a value
                 String taskId = pauseTaskIds.get(i);
-                log.warn("Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId);
-                killTask(taskId);
+                killTask(taskId, "Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId);
                 taskGroup.tasks.remove(taskId);
 
               } else { // otherwise build a map of the highest offsets seen
@@ -1683,8 +1677,11 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
               for (int i = 0; i < results.size(); i++) {
                 if (results.get(i) == null || !results.get(i)) {
                   String taskId = setEndOffsetTaskIds.get(i);
-                  log.warn("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", taskId);
-                  killTask(taskId);
+                  killTask(
+                      taskId,
+                      "Task [%s] failed to respond to [set end offsets] in a timely manner, killing task",
+                      taskId
+                  );
                   taskGroup.tasks.remove(taskId);
                 }
               }
@@ -1730,7 +1727,12 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
         if (stopTasksInTaskGroup) {
           // One of the earlier groups that was handling the same partition set timed out before the segments were
           // published so stop any additional groups handling the same partition set that are pending completion.
-          futures.add(stopTasksInGroup(group));
+          futures.add(
+              stopTasksInGroup(
+                  group,
+                  "one of earlier groups that was handling the same partition set timed out before publishing segments"
+              )
+          );
           toRemove.add(group);
           continue;
         }
@@ -1755,8 +1757,9 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
           if (taskData.status.isSuccess()) {
             // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
             // we no longer need them to publish their segment.
-            log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
-            futures.add(stopTasksInGroup(group));
+            futures.add(
+                stopTasksInGroup(group, "Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds())
+            );
             foundSuccess = true;
             toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
             break; // skip iterating the rest of the tasks in this group as they've all been stopped now
@@ -1778,12 +1781,20 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
           // reset partitions offsets for this task group so that they will be re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
           // kill all the tasks in this pending completion group
-          killTasksInGroup(group);
+          killTasksInGroup(
+              group,
+              "No task in pending completion taskGroup[%d] succeeded before completion timeout elapsed",
+              groupId
+          );
           // set a flag so the other pending completion groups for this set of partitions will also stop
           stopTasksInTaskGroup = true;
 
           // kill all the tasks in the currently reading task group and remove the bad task group
-          killTasksInGroup(taskGroups.remove(groupId));
+          killTasksInGroup(
+              taskGroups.remove(groupId),
+              "No task in the corresponding pending completion taskGroup[%d] succeeded before completion timeout elapsed",
+              groupId
+          );
           toRemove.add(group);
         }
       }
@@ -1837,7 +1848,7 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
         // check for successful tasks, and if we find one, stop all tasks in the group and remove the group so it can
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
-          futures.add(stopTasksInGroup(taskGroup));
+          futures.add(stopTasksInGroup(taskGroup, "task[%s] succeeded in the same taskGroup", taskData.status.getId()));
           iTaskGroups.remove();
           break;
         }
@@ -2099,18 +2110,24 @@ private boolean isTaskCurrent(int taskGroupId, String taskId)
     }
   }
 
-  private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
+  private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup, String stopReasonFormat, Object... args)
   {
     if (taskGroup == null) {
       return Futures.immediateFuture(null);
     }
 
+    log.info(
+        "Stopping all tasks in taskGroup[%s] because: [%s]",
+        taskGroup.groupId,
+        StringUtils.format(stopReasonFormat, args)
+    );
+
     final List<ListenableFuture<Void>> futures = new ArrayList<>();
     for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
       final String taskId = entry.getKey();
       final TaskData taskData = entry.getValue();
       if (taskData.status == null) {
-        killTask(taskId);
+        killTask(taskId, "Killing task since task status is not known to supervisor");
       } else if (!taskData.status.isComplete()) {
         futures.add(stopTask(taskId, false));
       }
@@ -2129,8 +2146,7 @@ private boolean isTaskCurrent(int taskGroupId, String taskId)
           public Void apply(@Nullable Boolean result)
           {
             if (result == null || !result) {
-              log.info("Task [%s] failed to stop in a timely manner, killing task", id);
-              killTask(id);
+              killTask(id, "Task [%s] failed to stop in a timely manner, killing task", id);
             }
             return null;
           }
@@ -2138,11 +2154,11 @@ public Void apply(@Nullable Boolean result)
     );
   }
 
-  private void killTask(final String id)
+  private void killTask(final String id, String reasonFormat, Object... args)
   {
     Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     if (taskQueue.isPresent()) {
-      taskQueue.get().shutdown(id);
+      taskQueue.get().shutdown(id, reasonFormat, args);
     } else {
       log.error("Failed to get task queue because I'm not the leader!");
     }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 237912346ec..9b4a5a6cf4a 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -654,7 +654,7 @@ public void testKillIncompatibleTasks() throws Exception
     expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
     expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false));
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
-    taskQueue.shutdown("id3");
+    taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3");
 
     expect(taskQueue.add(anyObject(Task.class))).andReturn(true);
 
@@ -763,8 +763,8 @@ public void testKillBadPartitionAssignment() throws Exception
         .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
-    taskQueue.shutdown("id4");
-    taskQueue.shutdown("id5");
+    taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, killing task", "id4");
+    taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, killing task", "id5");
     replayAll();
 
     supervisor.start();
@@ -1464,7 +1464,7 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
           .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED));
       expect(taskClient.getStartTimeAsync(task.getId()))
           .andReturn(Futures.immediateFailedFuture(new RuntimeException()));
-      taskQueue.shutdown(task.getId());
+      taskQueue.shutdown(task.getId(), "Task [%s] failed to return start time, killing task", task.getId());
     }
     replay(taskStorage, taskClient, taskQueue);
 
@@ -1535,7 +1535,11 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception
         .times(2);
     expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
         .andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
-    taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
+    taskQueue.shutdown(
+        EasyMock.contains("sequenceName-0"),
+        EasyMock.eq("Task [%s] failed to respond to [pause] in a timely manner, killing task"),
+        EasyMock.contains("sequenceName-0")
+    );
     expectLastCall().times(2);
     expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
 
@@ -1622,7 +1626,11 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
             EasyMock.eq(true)
         )
     ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
-    taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
+    taskQueue.shutdown(
+        EasyMock.contains("sequenceName-0"),
+        EasyMock.eq("All tasks in group [%s] failed to transition to publishing state"),
+        EasyMock.eq(0)
+    );
     expectLastCall().times(2);
     expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
 
@@ -1749,8 +1757,10 @@ public void testStopGracefully() throws Exception
         .andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
     expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
         .andReturn(Futures.immediateFuture(true));
-    taskQueue.shutdown("id3");
-    expectLastCall().times(2);
+    taskQueue.shutdown("id3", "Killing task for graceful shutdown");
+    expectLastCall().times(1);
+    taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3");
+    expectLastCall().times(1);
 
     replay(taskRunner, taskClient, taskQueue);
 
@@ -1950,8 +1960,8 @@ public void testResetRunningTasks() throws Exception
 
     reset(taskQueue, indexerMetadataStorageCoordinator);
     expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
-    taskQueue.shutdown("id2");
-    taskQueue.shutdown("id3");
+    taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
+    taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
     replay(taskQueue, indexerMetadataStorageCoordinator);
 
     supervisor.resetInternal(null);
@@ -2036,9 +2046,9 @@ public void testNoDataIngestionTasks() throws Exception
 
     reset(taskQueue, indexerMetadataStorageCoordinator);
     expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
-    taskQueue.shutdown("id1");
-    taskQueue.shutdown("id2");
-    taskQueue.shutdown("id3");
+    taskQueue.shutdown("id1", "DataSourceMetadata is not found while reset");
+    taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
+    taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
     replay(taskQueue, indexerMetadataStorageCoordinator);
 
     supervisor.resetInternal(null);
@@ -2424,8 +2434,10 @@ public void testSuspendedRunningTasks() throws Exception
         .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
     expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
         .andReturn(Futures.immediateFuture(true));
-    taskQueue.shutdown("id3");
-    expectLastCall().times(2);
+    taskQueue.shutdown("id3", "Killing task for graceful shutdown");
+    expectLastCall().times(1);
+    taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3");
+    expectLastCall().times(1);
 
     replayAll();
     supervisor.start();
diff --git a/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml b/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..bca6c69fdb1
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+    <Logger level="debug" name="org.apache.druid" additivity="false">
+      <AppenderRef ref="Console"/>
+    </Logger>
+  </Loggers>
+</Configuration>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 03af8570060..c3a0bc94ded 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -583,8 +583,9 @@ public void stop()
   }
 
   @Override
-  public void shutdown(final String taskid)
+  public void shutdown(final String taskid, String reason)
   {
+    log.info("Shutdown [%s] because: [%s]", taskid, reason);
     final ForkingTaskRunnerWorkItem taskInfo;
 
     synchronized (tasks) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index bde17ef25dc..b83ea673c9f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -541,8 +541,9 @@ public boolean isWorkerRunningTask(ZkWorker worker, String taskId)
    * @param taskId - task id to shutdown
    */
   @Override
-  public void shutdown(final String taskId)
+  public void shutdown(final String taskId, String reason)
   {
+    log.info("Shutdown [%s] because: [%s]", taskId, reason);
     if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
       log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
     } else if (pendingTasks.remove(taskId) != null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index a0fb43ab7d2..a9b631713d3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -278,8 +278,9 @@ public void stop()
    * @param taskid task ID to clean up resources for
    */
   @Override
-  public void shutdown(final String taskid)
+  public void shutdown(final String taskid, String reason)
   {
+    log.info("Shutdown [%s] because: [%s]", taskid, reason);
     if (runningItem != null && runningItem.getTask().getId().equals(taskid)) {
       runningItem.getResult().cancel(true);
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 082e1572872..9a9c41dfbb8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -252,7 +252,7 @@ private void manage() throws InterruptedException
               }
               catch (Exception e) {
                 log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
-                notifyStatus(task, TaskStatus.failure(task.getId()));
+                notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
                 continue;
               }
               if (taskIsReady) {
@@ -286,7 +286,11 @@ public String apply(Task task)
           log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
           for (final String taskId : tasksToKill) {
             try {
-              taskRunner.shutdown(taskId);
+              taskRunner.shutdown(
+                  taskId,
+                  "task is not in runnerTaskFutures[%s]",
+                  runnerTaskFutures.keySet()
+              );
             }
             catch (Exception e) {
               log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
@@ -356,7 +360,7 @@ private void removeTaskInternal(final Task task)
    *
    * @param taskId task to kill
    */
-  public void shutdown(final String taskId)
+  public void shutdown(final String taskId, String reasonFormat, Object... args)
   {
     giant.lock();
 
@@ -364,7 +368,7 @@ public void shutdown(final String taskId)
       Preconditions.checkNotNull(taskId, "taskId");
       for (final Task task : tasks) {
         if (task.getId().equals(taskId)) {
-          notifyStatus(task, TaskStatus.failure(taskId));
+          notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args);
           break;
         }
       }
@@ -386,7 +390,7 @@ public void shutdown(final String taskId)
    * @throws IllegalArgumentException if the task ID does not match the status ID
    * @throws IllegalStateException    if this queue is currently shut down
    */
-  private void notifyStatus(final Task task, final TaskStatus taskStatus)
+  private void notifyStatus(final Task task, final TaskStatus taskStatus, String reasonFormat, Object... args)
   {
     giant.lock();
 
@@ -402,7 +406,7 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus)
       );
       // Inform taskRunner that this task can be shut down
       try {
-        taskRunner.shutdown(task.getId());
+        taskRunner.shutdown(task.getId(), reasonFormat, args);
       }
       catch (Exception e) {
         log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
@@ -493,7 +497,7 @@ private void handleStatus(final TaskStatus status)
                 return;
               }
 
-              notifyStatus(task, status);
+              notifyStatus(task, status, "notified status change from task");
 
               // Emit event and log, if the task is done
               if (status.isComplete()) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index 6d4c867727a..81596596daa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -27,6 +27,7 @@
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
@@ -81,8 +82,14 @@
    * currently-running tasks.
    *
    * @param taskid task ID to clean up resources for
+   * @param reason reason to kill this task
    */
-  void shutdown(String taskid);
+  void shutdown(String taskid, String reason);
+
+  default void shutdown(String taskid, String reasonFormat, Object... args)
+  {
+    shutdown(taskid, StringUtils.format(reasonFormat, args));
+  }
 
   /**
    * Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 3d36000c9af..bf96b106ed8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1041,7 +1041,7 @@ private void addPendingTaskToExecutor(final String taskId)
   }
 
   @Override
-  public void shutdown(String taskId)
+  public void shutdown(String taskId, String reason)
   {
     if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
       log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
@@ -1050,6 +1050,7 @@ public void shutdown(String taskId)
 
     WorkerHolder workerHolderRunningTask = null;
     synchronized (statusLock) {
+      log.info("Shutdown [%s] because: [%s]", taskId, reason);
       HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.remove(taskId);
       if (taskRunnerWorkItem != null) {
         if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 6434dfef05c..6f8875f138c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -330,7 +330,7 @@ public Response doShutdown(@PathParam("taskid") final String taskid)
           @Override
           public Response apply(TaskQueue taskQueue)
           {
-            taskQueue.shutdown(taskid);
+            taskQueue.shutdown(taskid, "Shutdown request from user");
             return Response.ok(ImmutableMap.of("task", taskid)).build();
           }
         }
@@ -351,7 +351,7 @@ public Response apply(TaskQueue taskQueue)
           {
             final List<TaskInfo<Task, TaskStatus>> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
             for (final TaskInfo<Task, TaskStatus> task : tasks) {
-              taskQueue.shutdown(task.getId());
+              taskQueue.shutdown(task.getId(), "Shutdown request from user");
             }
             return Response.ok(ImmutableMap.of("dataSource", dataSource)).build();
           }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
index d37b2bf06f2..bef73ce9e7f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
@@ -168,7 +168,7 @@ public String apply(TaskRunnerWorkItem input)
   public Response doShutdown(@PathParam("taskid") String taskid)
   {
     try {
-      taskRunner.shutdown(taskid);
+      taskRunner.shutdown(taskid, "shut down request via HTTP endpoint");
     }
     catch (Exception e) {
       log.error(e, "Failed to issue shutdown for task: %s", taskid);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
index 3aed5a59c74..4cc352fe9ee 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
@@ -101,19 +101,19 @@ public int getPendingTasksRunnerNumThreads()
 
     if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())
         && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())) {
-      remoteTaskRunner.shutdown("task4");
+      remoteTaskRunner.shutdown("task4", "test");
       mockWorkerRunningAndCompletionSuccessfulTasks(tasks[3], tasks[2]);
       Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode());
     } else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())
                && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())) {
-      remoteTaskRunner.shutdown("task2");
+      remoteTaskRunner.shutdown("task2", "test");
       mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[3]);
       Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode());
     } else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())
                && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())) {
-      remoteTaskRunner.shutdown("task3");
+      remoteTaskRunner.shutdown("task3", "test");
       mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[2]);
       Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
index fbc5b2408ab..cb6cffee0d9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
@@ -236,7 +236,7 @@ public void onFailure(Throwable t)
   }
 
   @Override
-  public void shutdown(final String taskid)
+  public void shutdown(final String taskid, String reason)
   {
     for (final TaskRunnerWorkItem runningItem : runningItems) {
       if (runningItem.getTaskId().equals(taskid)) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 3a4c210c200..ecf2af43524 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -408,7 +408,7 @@ public String getDataSource()
     }
 
     @Override
-    public void shutdown(String taskid) {}
+    public void shutdown(String taskid, String reason) {}
 
     @Override
     public synchronized Collection<? extends TaskRunnerWorkItem> getRunningTasks()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org