You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/07/23 18:47:34 UTC

[incubator-druid] branch master updated: fix forking task runner task shutdown to be more graceful (#8085)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new cb82d72  fix forking task runner task shutdown to be more graceful (#8085)
cb82d72 is described below

commit cb82d725475c7f1cc623b2d1b414d7c7124cab60
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Tue Jul 23 11:47:23 2019 -0700

    fix forking task runner task shutdown to be more graceful (#8085)
    
    * fix forking task runner shutdown to be more graceful
    
    * javadoc
---
 .../druid/indexing/overlord/ForkingTaskRunner.java | 42 +++++++++++++---------
 1 file changed, 25 insertions(+), 17 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 53db87d..88da445 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -527,16 +527,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
 
     synchronized (tasks) {
       for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
-        if (taskWorkItem.processHolder != null) {
-          log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId());
-          try {
-            taskWorkItem.processHolder.process.getOutputStream().close();
-          }
-          catch (Exception e) {
-            log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId());
-            taskWorkItem.processHolder.process.destroy();
-          }
-        }
+        shutdownTaskProcess(taskWorkItem);
       }
     }
 
@@ -593,12 +584,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
       }
 
       taskInfo.shutdown = true;
-    }
 
-    if (taskInfo.processHolder != null) {
-      // Will trigger normal failure mechanisms due to process exit
-      log.info("Killing process for task: %s", taskid);
-      taskInfo.processHolder.process.destroy();
+      shutdownTaskProcess(taskInfo);
     }
   }
 
@@ -694,8 +681,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
     );
   }
 
-  // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
-  // occur while saving.
+  /**
+   * Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that occur
+   * while saving.
+   */
   @GuardedBy("tasks")
   private void saveRunningTasks()
   {
@@ -714,6 +703,25 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
     }
   }
 
+  /**
+   * Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process
+   * if an exception is encountered.
+   */
+  private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo)
+  {
+    if (taskInfo.processHolder != null) {
+      // Will trigger normal failure mechanisms due to process exit
+      log.info("Closing output stream to task[%s].", taskInfo.getTask().getId());
+      try {
+        taskInfo.processHolder.process.getOutputStream().close();
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId());
+        taskInfo.processHolder.process.destroy();
+      }
+    }
+  }
+
   private File getRestoreFile()
   {
     return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org