You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/07/29 21:09:18 UTC
[incubator-druid] branch 0.15.1-incubating updated: fix forking
task runner task shutdown to be more graceful (#8085) (#8152)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.15.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.15.1-incubating by this push:
new 43c300b fix forking task runner task shutdown to be more graceful (#8085) (#8152)
43c300b is described below
commit 43c300b1fc4a4172a9878106a7705b85578ffad2
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Jul 29 14:09:09 2019 -0700
fix forking task runner task shutdown to be more graceful (#8085) (#8152)
* fix forking task runner shutdown to be more graceful
* javadoc
---
.../druid/indexing/overlord/ForkingTaskRunner.java | 42 +++++++++++++---------
1 file changed, 25 insertions(+), 17 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 05f2f52..e0b9291 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -531,16 +531,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
synchronized (tasks) {
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
- if (taskWorkItem.processHolder != null) {
- log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId());
- try {
- taskWorkItem.processHolder.process.getOutputStream().close();
- }
- catch (Exception e) {
- log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId());
- taskWorkItem.processHolder.process.destroy();
- }
- }
+ shutdownTaskProcess(taskWorkItem);
}
}
@@ -597,12 +588,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
taskInfo.shutdown = true;
- }
- if (taskInfo.processHolder != null) {
- // Will trigger normal failure mechanisms due to process exit
- log.info("Killing process for task: %s", taskid);
- taskInfo.processHolder.process.destroy();
+ shutdownTaskProcess(taskInfo);
}
}
@@ -698,8 +685,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
);
}
- // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
- // occur while saving.
+ /**
+ * Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that occur
+ * while saving.
+ */
@GuardedBy("tasks")
private void saveRunningTasks()
{
@@ -718,6 +707,25 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
+ /**
+ * Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process
+ * if an exception is encountered.
+ */
+ private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo)
+ {
+ if (taskInfo.processHolder != null) {
+ // Will trigger normal failure mechanisms due to process exit
+ log.info("Closing output stream to task[%s].", taskInfo.getTask().getId());
+ try {
+ taskInfo.processHolder.process.getOutputStream().close();
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId());
+ taskInfo.processHolder.process.destroy();
+ }
+ }
+ }
+
private File getRestoreFile()
{
return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org