You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/10 13:56:37 UTC
[GitHub] ajbosco closed pull request #4078: [AIRFLOW-3234] add
dagbag_import_failure_handler
ajbosco closed pull request #4078: [AIRFLOW-3234] add dagbag_import_failure_handler
URL: https://github.com/apache/incubator-airflow/pull/4078
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 12e5a16f21..4b572974dd 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -137,6 +137,11 @@ donot_pickle = False
# How long before timing out a python file import while filling the DagBag
dagbag_import_timeout = 30
+# Specify the class that will be used to handle dagbag import failures
+# This class has to be on the python classpath
+# dagbag_import_failure_handler = my.path.dagbag_import_failure_handler
+dagbag_import_failure_handler =
+
# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0bcb131c72..4cc8fadcc7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -60,6 +60,7 @@
from airflow.utils.db import create_session, provide_session
from airflow.utils.email import send_email, get_email_address_list
from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter
+from airflow.utils.module_loading import import_string
from airflow.utils.net import get_hostname
from airflow.utils.state import State
from airflow.utils.sqlalchemy import UtcDateTime
@@ -579,6 +580,15 @@ def __init__(
self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
self.max_threads = conf.getint('scheduler', 'max_threads')
+ dagbag_import_failure_handler_path = conf.get('core', 'DAGBAG_IMPORT_FAILURE_HANDLER')
+ if dagbag_import_failure_handler_path:
+ try:
+ self.dagbag_import_failure_handler = import_string(dagbag_import_failure_handler_path)
+ except Exception as e:
+ self.log.error("Could not import dagbag failure handler, err: %s", e)
+ else:
+ self.dagbag_import_failure_handler = None
+
if log:
self._log = log
@@ -1480,6 +1490,12 @@ def _process_executor_events(self, simple_dag_bag, session=None):
ti.state = State.FAILED
session.merge(ti)
session.commit()
+ if self.dagbag_import_failure_handler:
+ try:
+ self.dagbag_import_failure_handler(dag_id)
+ except Exception as e:
+ self.log.error("Could not call dagbag_import_failure_handler"
+ "for DAG %s, err: %s", dag_id, e)
def _log_file_processing_stats(self,
known_file_paths,
diff --git a/tests/jobs.py b/tests/jobs.py
index 9b265724b6..284cd6fcc3 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2760,6 +2760,45 @@ def do_schedule(function, function2):
do_schedule()
self.assertEquals(2, len(executor.queued_tasks))
+ @mock.patch("airflow.utils.dag_processing.SimpleDagBag.get_dag")
+ @mock.patch("airflow.utils.timeout.timeout")
+ def test_scheduler_dagbag_import_failure_handler(
+ self, dagbag_import_failure_mock, get_dag_mock
+ ):
+ """
+ Test that the scheduler calls the dagbag_import_failure_handler if the the dagbag was not imported
+ """
+ session = settings.Session()
+
+ configuration.set(
+ "core", "dagbag_import_failure_handler", "airflow.utils.timeout.timeout"
+ )
+ get_dag_mock.side_effect = Exception()
+
+ dag_id = "test_scheduler_dagbag_import_failure_handler"
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+ task1 = DummyOperator(dag=dag, task_id="dummy_task")
+
+ dagbag = self._make_simple_dag_bag([dag])
+ scheduler = SchedulerJob()
+ session = settings.Session()
+
+ ti1 = TI(task1, DEFAULT_DATE)
+ ti1.state = State.QUEUED
+ session.merge(ti1)
+ session.commit()
+
+ executor = TestExecutor()
+ executor.event_buffer[ti1.key] = State.FAILED
+
+ scheduler.executor = executor
+
+ scheduler._process_executor_events(simple_dag_bag=dagbag)
+
+ ti1.refresh_from_db()
+
+ dagbag_import_failure_mock.assert_called()
+
def test_scheduler_sla_miss_callback(self):
"""
Test that the scheduler does not call the sla_miss_callback when a notification has already been sent
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services