You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/12/08 23:27:30 UTC

incubator-airflow git commit: [AIRFLOW-677] Kill task if it fails to heartbeat

Repository: incubator-airflow
Updated Branches:
  refs/heads/master b67465631 -> 23068924c


[AIRFLOW-677] Kill task if it fails to heartbeat

https://issues.apache.org/jira/browse/AIRFLOW-677

Testing Done:
- We've been running this in production at Airbnb
for a bit, although off a different merge base

[AIRFLOW-677] Kill task if it fails to heartbeat

If there's a connection error while heartbeating,
it should retry. Also,
if it hasn't been able to heartbeat for a while,
it should kill the
child processes so that we don't have 2 of the
same task running.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/23068924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/23068924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/23068924

Branch: refs/heads/master
Commit: 23068924c09d6cebb0af6151b1950197ac16e67b
Parents: b674656
Author: Alex Guziel <al...@airbnb.com>
Authored: Thu Dec 8 15:26:55 2016 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Thu Dec 8 15:27:05 2016 -0800

----------------------------------------------------------------------
 airflow/configuration.py |  5 ++++
 airflow/jobs.py          | 66 ++++++++++++++++++++++++++++++++-----------
 airflow/models.py        |  2 +-
 airflow/utils/helpers.py | 61 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 117 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 265f728..1adf748 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -350,6 +350,11 @@ print_stats_interval = 30
 
 child_process_log_directory = /tmp/airflow/scheduler/logs
 
+# Local task jobs periodically heartbeat to the DB. If the job has
+# not heartbeat in this many seconds, the scheduler will mark the
+# associated task instance as failed and will re-schedule the task.
+scheduler_zombie_task_threshold = 300
+
 # Statsd (https://github.com/etsy/statsd) integration settings
 statsd_on = False
 statsd_host = localhost

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 7eb4b99..22cdeb0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -36,6 +36,7 @@ from time import sleep
 
 import psutil
 from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
+from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
 
@@ -53,6 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           SimpleDagBag,
                                           list_py_file_paths)
 from airflow.utils.email import send_email
+from airflow.utils.helpers import kill_descendant_processes
 from airflow.utils.logging import LoggingMixin
 from airflow.utils import asciiart
 
@@ -1967,22 +1969,54 @@ class LocalTaskJob(BaseJob):
         super(LocalTaskJob, self).__init__(*args, **kwargs)
 
     def _execute(self):
-        command = self.task_instance.command(
-            raw=True,
-            ignore_all_deps=self.ignore_all_deps,
-            ignore_depends_on_past=self.ignore_depends_on_past,
-            ignore_task_deps=self.ignore_task_deps,
-            ignore_ti_state=self.ignore_ti_state,
-            pickle_id=self.pickle_id,
-            mark_success=self.mark_success,
-            job_id=self.id,
-            pool=self.pool,
-        )
-        self.process = subprocess.Popen(['bash', '-c', command])
-        return_code = None
-        while return_code is None:
-            self.heartbeat()
-            return_code = self.process.poll()
+        try:
+            command = self.task_instance.command(
+                raw=True,
+                ignore_all_deps = self.ignore_all_deps,
+                ignore_depends_on_past = self.ignore_depends_on_past,
+                ignore_task_deps = self.ignore_task_deps,
+                ignore_ti_state = self.ignore_ti_state,
+                pickle_id = self.pickle_id,
+                mark_success = self.mark_success,
+                job_id = self.id,
+                pool = self.pool
+            )
+            self.process = subprocess.Popen(['bash', '-c', command])
+            self.logger.info("Subprocess PID is {}".format(self.process.pid))
+
+            last_heartbeat_time = time.time()
+            heartbeat_time_limit = conf.getint('scheduler',
+                                               'scheduler_zombie_task_threshold')
+            while True:
+                # Monitor the task to see if it's done
+                return_code = self.process.poll()
+                if return_code is not None:
+                    return
+
+                # Periodically heartbeat so that the scheduler doesn't think this
+                # is a zombie
+                try:
+                    self.heartbeat()
+                    last_heartbeat_time = time.time()
+                except OperationalError:
+                    Stats.incr('local_task_job_heartbeat_failure', 1, 1)
+                    self.logger.exception("Exception while trying to heartbeat! "
+                                          "Sleeping for {}s".format(self.heartrate))
+                    time.sleep(self.heartrate)
+
+                # If it's been too long since we've heartbeat, then it's possible that
+                # the scheduler rescheduled this task, so kill launched processes.
+                time_since_last_heartbeat = time.time() - last_heartbeat_time
+                if time_since_last_heartbeat > heartbeat_time_limit:
+                    Stats.incr('local_task_job_prolonged_heartbeat_failure', 1, 1)
+                    self.logger.error("Heartbeat time limited exceeded!")
+                    raise AirflowException("Time since last heartbeat({:.2f}s) "
+                                           "exceeded limit ({}s)."
+                                           .format(time_since_last_heartbeat,
+                                                   heartbeat_time_limit))
+        finally:
+            # Kill processes that were left running
+            kill_descendant_processes(self.logger)
 
     def on_kill(self):
         self.process.terminate()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 5763b96..f46a352 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -321,7 +321,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         self.logger.info("Finding 'running' jobs without a recent heartbeat")
         TI = TaskInstance
         secs = (
-            configuration.getint('scheduler', 'job_heartbeat_sec') * 3) + 120
+            configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
         limit_dttm = datetime.now() - timedelta(seconds=secs)
         self.logger.info(
             "Failing jobs without heartbeat after {}".format(limit_dttm))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index bd26f20..24fc310 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -17,6 +17,8 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
+import psutil
+
 from builtins import input
 from past.builtins import basestring
 from datetime import datetime
@@ -29,6 +31,9 @@ import warnings
 
 from airflow.exceptions import AirflowException
 
+# When killing processes, time to wait after issuing a SIGTERM before issuing a
+# SIGKILL.
+TIME_TO_WAIT_AFTER_SIGTERM = 5
 
 def validate_key(k, max_length=250):
     if not isinstance(k, basestring):
@@ -174,6 +179,62 @@ def pprinttable(rows):
     return s
 
 
+def kill_descendant_processes(logger, pids_to_kill=None):
+    """
+    Kills all descendant processes of this process.
+
+    :param logger: logger
+    :type logger: logging.Logger
+    :param pids_to_kill: if specified, kill only these PIDs
+    :type pids_to_kill: list[int]
+    """
+    # First try SIGTERM
+    this_process = psutil.Process(os.getpid())
+
+    # Only check child processes to ensure that we don't have a case
+    # where a child process died but the PID got reused.
+    descendant_processes = [x for x in this_process.children(recursive=True)
+                            if x.is_running()]
+    if pids_to_kill:
+        descendant_processes = [x for x in descendant_processes
+                                if x.pid in pids_to_kill]
+
+    if len(descendant_processes) == 0:
+        logger.debug("There are no descendant processes that can be killed")
+        return
+    logger.warn("Terminating descendant processes of {} PID: {}"
+                .format(this_process.cmdline(),
+                        this_process.pid))
+    for descendant in descendant_processes:
+        logger.warn("Terminating descendant process {} PID: {}"
+                    .format(descendant.cmdline(), descendant.pid))
+        descendant.terminate()
+    logger.warn("Waiting up to {}s for processes to exit..."
+                .format(TIME_TO_WAIT_AFTER_SIGTERM))
+    try:
+        psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
+        logger.warn("Done waiting")
+    except psutil.TimeoutExpired:
+        logger.warn("Ran out of time while waiting for "
+                    "processes to exit")
+    # Then SIGKILL
+    descendant_processes = [x for x in this_process.children(recursive=True)
+                            if x.is_running()]
+    if pids_to_kill:
+        descendant_processes = [x for x in descendant_processes
+                                if x.pid in pids_to_kill]
+
+    if len(descendant_processes) > 0:
+        for descendant in descendant_processes:
+            logger.warn("Killing descendant process {} PID: {}"
+                        .format(descendant.cmdline(), descendant.pid))
+            descendant.kill()
+            descendant.wait()
+        logger.warn("Killed all descendant processes of {} PID: {}"
+                    .format(this_process.cmdline(),
+                            this_process.pid))
+
+
 class AirflowImporter(object):
     """
     Importer that dynamically loads a class and module from its parent. This