You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/10 13:56:37 UTC

[GitHub] ajbosco closed pull request #4078: [AIRFLOW-3234] add dagbag_import_failure_handler

ajbosco closed pull request #4078: [AIRFLOW-3234] add dagbag_import_failure_handler
URL: https://github.com/apache/incubator-airflow/pull/4078
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 12e5a16f21..4b572974dd 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -137,6 +137,11 @@ donot_pickle = False
 # How long before timing out a python file import while filling the DagBag
 dagbag_import_timeout = 30
 
+# Specify the class that will be used to handle dagbag import failures
+# This class has to be on the python classpath
+# dagbag_import_failure_handler = my.path.dagbag_import_failure_handler
+dagbag_import_failure_handler = 
+
 # The class to use for running task instances in a subprocess
 task_runner = StandardTaskRunner
 
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0bcb131c72..4cc8fadcc7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -60,6 +60,7 @@
 from airflow.utils.db import create_session, provide_session
 from airflow.utils.email import send_email, get_email_address_list
 from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter
+from airflow.utils.module_loading import import_string
 from airflow.utils.net import get_hostname
 from airflow.utils.state import State
 from airflow.utils.sqlalchemy import UtcDateTime
@@ -579,6 +580,15 @@ def __init__(
         self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
         self.max_threads = conf.getint('scheduler', 'max_threads')
 
+        dagbag_import_failure_handler_path = conf.get('core', 'DAGBAG_IMPORT_FAILURE_HANDLER')
+        if dagbag_import_failure_handler_path:
+            try:
+                self.dagbag_import_failure_handler = import_string(dagbag_import_failure_handler_path)
+            except Exception as e:
+                self.log.error("Could not import dagbag failure handler, err: %s", e)
+        else:
+            self.dagbag_import_failure_handler = None
+
         if log:
             self._log = log
 
@@ -1480,6 +1490,12 @@ def _process_executor_events(self, simple_dag_bag, session=None):
                         ti.state = State.FAILED
                         session.merge(ti)
                         session.commit()
+                        if self.dagbag_import_failure_handler:
+                            try:
+                                self.dagbag_import_failure_handler(dag_id)
+                            except Exception as e:
+                                self.log.error("Could not call dagbag_import_failure_handler"
+                                               "for DAG %s, err: %s", dag_id, e)
 
     def _log_file_processing_stats(self,
                                    known_file_paths,
diff --git a/tests/jobs.py b/tests/jobs.py
index 9b265724b6..284cd6fcc3 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2760,6 +2760,45 @@ def do_schedule(function, function2):
         do_schedule()
         self.assertEquals(2, len(executor.queued_tasks))
 
+    @mock.patch("airflow.utils.dag_processing.SimpleDagBag.get_dag")
+    @mock.patch("airflow.utils.timeout.timeout")
+    def test_scheduler_dagbag_import_failure_handler(
+        self, dagbag_import_failure_mock, get_dag_mock
+    ):
+        """
+        Test that the scheduler calls the dagbag_import_failure_handler if the the dagbag was not imported
+        """
+        session = settings.Session()
+
+        configuration.set(
+            "core", "dagbag_import_failure_handler", "airflow.utils.timeout.timeout"
+        )
+        get_dag_mock.side_effect = Exception()
+
+        dag_id = "test_scheduler_dagbag_import_failure_handler"
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        task1 = DummyOperator(dag=dag, task_id="dummy_task")
+
+        dagbag = self._make_simple_dag_bag([dag])
+        scheduler = SchedulerJob()
+        session = settings.Session()
+
+        ti1 = TI(task1, DEFAULT_DATE)
+        ti1.state = State.QUEUED
+        session.merge(ti1)
+        session.commit()
+
+        executor = TestExecutor()
+        executor.event_buffer[ti1.key] = State.FAILED
+
+        scheduler.executor = executor
+
+        scheduler._process_executor_events(simple_dag_bag=dagbag)
+
+        ti1.refresh_from_db()
+
+        dagbag_import_failure_mock.assert_called()
+
     def test_scheduler_sla_miss_callback(self):
         """
         Test that the scheduler does not call the sla_miss_callback when a notification has already been sent


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services