You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/12/08 23:27:30 UTC
incubator-airflow git commit: [AIRFLOW-677] Kill task if it fails to
heartbeat
Repository: incubator-airflow
Updated Branches:
refs/heads/master b67465631 -> 23068924c
[AIRFLOW-677] Kill task if it fails to heartbeat
https://issues.apache.org/jira/browse/AIRFLOW-677
Testing Done:
- We've been running this in production at Airbnb
for a bit, although off a different merge base
[AIRFLOW-677] Kill task if it fails to heartbeat
If there's a connection error while heartbeating,
it should retry. Also,
if it hasn't been able to heartbeat for a while,
it should kill the
child processes so that we don't have 2 of the
same task running.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/23068924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/23068924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/23068924
Branch: refs/heads/master
Commit: 23068924c09d6cebb0af6151b1950197ac16e67b
Parents: b674656
Author: Alex Guziel <al...@airbnb.com>
Authored: Thu Dec 8 15:26:55 2016 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Thu Dec 8 15:27:05 2016 -0800
----------------------------------------------------------------------
airflow/configuration.py | 5 ++++
airflow/jobs.py | 66 ++++++++++++++++++++++++++++++++-----------
airflow/models.py | 2 +-
airflow/utils/helpers.py | 61 +++++++++++++++++++++++++++++++++++++++
4 files changed, 117 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 265f728..1adf748 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -350,6 +350,11 @@ print_stats_interval = 30
child_process_log_directory = /tmp/airflow/scheduler/logs
+# Local task jobs periodically heartbeat to the DB. If the job has
+# not heartbeat in this many seconds, the scheduler will mark the
+# associated task instance as failed and will re-schedule the task.
+scheduler_zombie_task_threshold = 300
+
# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
statsd_host = localhost
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 7eb4b99..22cdeb0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -36,6 +36,7 @@ from time import sleep
import psutil
from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
+from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
@@ -53,6 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
SimpleDagBag,
list_py_file_paths)
from airflow.utils.email import send_email
+from airflow.utils.helpers import kill_descendant_processes
from airflow.utils.logging import LoggingMixin
from airflow.utils import asciiart
@@ -1967,22 +1969,54 @@ class LocalTaskJob(BaseJob):
super(LocalTaskJob, self).__init__(*args, **kwargs)
def _execute(self):
- command = self.task_instance.command(
- raw=True,
- ignore_all_deps=self.ignore_all_deps,
- ignore_depends_on_past=self.ignore_depends_on_past,
- ignore_task_deps=self.ignore_task_deps,
- ignore_ti_state=self.ignore_ti_state,
- pickle_id=self.pickle_id,
- mark_success=self.mark_success,
- job_id=self.id,
- pool=self.pool,
- )
- self.process = subprocess.Popen(['bash', '-c', command])
- return_code = None
- while return_code is None:
- self.heartbeat()
- return_code = self.process.poll()
+ try:
+ command = self.task_instance.command(
+ raw=True,
+ ignore_all_deps = self.ignore_all_deps,
+ ignore_depends_on_past = self.ignore_depends_on_past,
+ ignore_task_deps = self.ignore_task_deps,
+ ignore_ti_state = self.ignore_ti_state,
+ pickle_id = self.pickle_id,
+ mark_success = self.mark_success,
+ job_id = self.id,
+ pool = self.pool
+ )
+ self.process = subprocess.Popen(['bash', '-c', command])
+ self.logger.info("Subprocess PID is {}".format(self.process.pid))
+
+ last_heartbeat_time = time.time()
+ heartbeat_time_limit = conf.getint('scheduler',
+ 'scheduler_zombie_task_threshold')
+ while True:
+ # Monitor the task to see if it's done
+ return_code = self.process.poll()
+ if return_code is not None:
+ return
+
+ # Periodically heartbeat so that the scheduler doesn't think this
+ # is a zombie
+ try:
+ self.heartbeat()
+ last_heartbeat_time = time.time()
+ except OperationalError:
+ Stats.incr('local_task_job_heartbeat_failure', 1, 1)
+ self.logger.exception("Exception while trying to heartbeat! "
+ "Sleeping for {}s".format(self.heartrate))
+ time.sleep(self.heartrate)
+
+ # If it's been too long since we've heartbeat, then it's possible that
+ # the scheduler rescheduled this task, so kill launched processes.
+ time_since_last_heartbeat = time.time() - last_heartbeat_time
+ if time_since_last_heartbeat > heartbeat_time_limit:
+ Stats.incr('local_task_job_prolonged_heartbeat_failure', 1, 1)
+ self.logger.error("Heartbeat time limited exceeded!")
+ raise AirflowException("Time since last heartbeat({:.2f}s) "
+ "exceeded limit ({}s)."
+ .format(time_since_last_heartbeat,
+ heartbeat_time_limit))
+ finally:
+ # Kill processes that were left running
+ kill_descendant_processes(self.logger)
def on_kill(self):
self.process.terminate()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 5763b96..f46a352 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -321,7 +321,7 @@ class DagBag(BaseDagBag, LoggingMixin):
self.logger.info("Finding 'running' jobs without a recent heartbeat")
TI = TaskInstance
secs = (
- configuration.getint('scheduler', 'job_heartbeat_sec') * 3) + 120
+ configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
limit_dttm = datetime.now() - timedelta(seconds=secs)
self.logger.info(
"Failing jobs without heartbeat after {}".format(limit_dttm))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/23068924/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index bd26f20..24fc310 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -17,6 +17,8 @@ from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
+import psutil
+
from builtins import input
from past.builtins import basestring
from datetime import datetime
@@ -29,6 +31,9 @@ import warnings
from airflow.exceptions import AirflowException
+# When killing processes, time to wait after issuing a SIGTERM before issuing a
+# SIGKILL.
+TIME_TO_WAIT_AFTER_SIGTERM = 5
def validate_key(k, max_length=250):
if not isinstance(k, basestring):
@@ -174,6 +179,62 @@ def pprinttable(rows):
return s
+def kill_descendant_processes(logger, pids_to_kill=None):
+ """
+ Kills all descendant processes of this process.
+
+ :param logger: logger
+ :type logger: logging.Logger
+ :param pids_to_kill: if specified, kill only these PIDs
+ :type pids_to_kill: list[int]
+ """
+ # First try SIGTERM
+ this_process = psutil.Process(os.getpid())
+
+ # Only check child processes to ensure that we don't have a case
+ # where a child process died but the PID got reused.
+ descendant_processes = [x for x in this_process.children(recursive=True)
+ if x.is_running()]
+ if pids_to_kill:
+ descendant_processes = [x for x in descendant_processes
+ if x.pid in pids_to_kill]
+
+ if len(descendant_processes) == 0:
+ logger.debug("There are no descendant processes that can be killed")
+ return
+ logger.warn("Terminating descendant processes of {} PID: {}"
+ .format(this_process.cmdline(),
+ this_process.pid))
+ for descendant in descendant_processes:
+ logger.warn("Terminating descendant process {} PID: {}"
+ .format(descendant.cmdline(), descendant.pid))
+ descendant.terminate()
+ logger.warn("Waiting up to {}s for processes to exit..."
+ .format(TIME_TO_WAIT_AFTER_SIGTERM))
+ try:
+ psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
+ logger.warn("Done waiting")
+ except psutil.TimeoutExpired:
+ logger.warn("Ran out of time while waiting for "
+ "processes to exit")
+ # Then SIGKILL
+ descendant_processes = [x for x in this_process.children(recursive=True)
+ if x.is_running()]
+ if pids_to_kill:
+ descendant_processes = [x for x in descendant_processes
+ if x.pid in pids_to_kill]
+
+ if len(descendant_processes) > 0:
+ for descendant in descendant_processes:
+ logger.warn("Killing descendant process {} PID: {}"
+ .format(descendant.cmdline(), descendant.pid))
+ descendant.kill()
+ descendant.wait()
+ logger.warn("Killed all descendant processes of {} PID: {}"
+ .format(this_process.cmdline(),
+ this_process.pid))
+
+
class AirflowImporter(object):
"""
Importer that dynamically loads a class and module from its parent. This