You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/07/23 18:47:34 UTC
[incubator-druid] branch master updated: fix forking task runner
task shutdown to be more graceful (#8085)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new cb82d72 fix forking task runner task shutdown to be more graceful (#8085)
cb82d72 is described below
commit cb82d725475c7f1cc623b2d1b414d7c7124cab60
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Tue Jul 23 11:47:23 2019 -0700
fix forking task runner task shutdown to be more graceful (#8085)
* fix forking task runner shutdown to be more graceful
* javadoc
---
.../druid/indexing/overlord/ForkingTaskRunner.java | 42 +++++++++++++---------
1 file changed, 25 insertions(+), 17 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 53db87d..88da445 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -527,16 +527,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
synchronized (tasks) {
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
- if (taskWorkItem.processHolder != null) {
- log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId());
- try {
- taskWorkItem.processHolder.process.getOutputStream().close();
- }
- catch (Exception e) {
- log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId());
- taskWorkItem.processHolder.process.destroy();
- }
- }
+ shutdownTaskProcess(taskWorkItem);
}
}
@@ -593,12 +584,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
taskInfo.shutdown = true;
- }
- if (taskInfo.processHolder != null) {
- // Will trigger normal failure mechanisms due to process exit
- log.info("Killing process for task: %s", taskid);
- taskInfo.processHolder.process.destroy();
+ shutdownTaskProcess(taskInfo);
}
}
@@ -694,8 +681,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
);
}
- // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
- // occur while saving.
+ /**
+ * Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that occur
+ * while saving.
+ */
@GuardedBy("tasks")
private void saveRunningTasks()
{
@@ -714,6 +703,25 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
+ /**
+ * Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process
+ * if an exception is encountered.
+ */
+ private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo)
+ {
+ if (taskInfo.processHolder != null) {
+ // Will trigger normal failure mechanisms due to process exit
+ log.info("Closing output stream to task[%s].", taskInfo.getTask().getId());
+ try {
+ taskInfo.processHolder.process.getOutputStream().close();
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId());
+ taskInfo.processHolder.process.destroy();
+ }
+ }
+ }
+
private File getRestoreFile()
{
return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org