You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:49 UTC

[airflow] 29/38: Handle and log exceptions raised during task callback (#17347)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0678a94220f90940a3d84b5c89b17e3c7e9d2dae
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Fri Aug 6 03:35:13 2021 -0700

    Handle and log exceptions raised during task callback (#17347)
    
    Add missing exception handling in success/retry/failure callbacks
    
    (cherry picked from commit faf9f731fa8810e05f868ffec989ea042381ada4)
---
 airflow/models/taskinstance.py    | 15 ++++++++++++---
 tests/models/test_taskinstance.py | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 41fa661..b99fa34 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1390,18 +1390,27 @@ class TaskInstance(Base, LoggingMixin):
             if task.on_failure_callback is not None:
                 context = self.get_template_context()
                 context["exception"] = error
-                task.on_failure_callback(context)
+                try:
+                    task.on_failure_callback(context)
+                except Exception:
+                    self.log.exception("Error when executing on_failure_callback")
         elif self.state == State.SUCCESS:
             task = self.task
             if task.on_success_callback is not None:
                 context = self.get_template_context()
-                task.on_success_callback(context)
+                try:
+                    task.on_success_callback(context)
+                except Exception:
+                    self.log.exception("Error when executing on_success_callback")
         elif self.state == State.UP_FOR_RETRY:
             task = self.task
             if task.on_retry_callback is not None:
                 context = self.get_template_context()
                 context["exception"] = error
-                task.on_retry_callback(context)
+                try:
+                    task.on_retry_callback(context)
+                except Exception:
+                    self.log.exception("Error when executing on_retry_callback")
 
     @provide_session
     def run(
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 63f0479..021809b 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1657,6 +1657,45 @@ class TestTaskInstance(unittest.TestCase):
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
+    @parameterized.expand(
+        [
+            (State.SUCCESS, "Error when executing on_success_callback"),
+            (State.UP_FOR_RETRY, "Error when executing on_retry_callback"),
+            (State.FAILED, "Error when executing on_failure_callback"),
+        ]
+    )
+    def test_finished_callbacks_handle_and_log_exception(self, finished_state, expected_message):
+        called = completed = False
+
+        def on_finish_callable(context):
+            nonlocal called, completed
+            called = True
+            raise KeyError
+            completed = True
+
+        dag = DAG(
+            'test_success_callback_handles_exception',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        )
+        task = DummyOperator(
+            task_id='op',
+            email='test@test.test',
+            on_success_callback=on_finish_callable,
+            on_retry_callback=on_finish_callable,
+            on_failure_callback=on_finish_callable,
+            dag=dag,
+        )
+
+        ti = TI(task=task, execution_date=datetime.datetime.now())
+        ti._log = mock.Mock()
+        ti.state = finished_state
+        ti._run_finished_callback()
+
+        assert called
+        assert not completed
+        ti.log.exception.assert_called_once_with(expected_message)
+
     def test_handle_failure(self):
         start_date = timezone.datetime(2016, 6, 1)
         dag = models.DAG(dag_id="test_handle_failure", schedule_interval=None, start_date=start_date)