You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:49 UTC
[airflow] 29/38: Handle and log exceptions raised during task
callback (#17347)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0678a94220f90940a3d84b5c89b17e3c7e9d2dae
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Fri Aug 6 03:35:13 2021 -0700
Handle and log exceptions raised during task callback (#17347)
Add missing exception handling in success/retry/failure callbacks
(cherry picked from commit faf9f731fa8810e05f868ffec989ea042381ada4)
---
airflow/models/taskinstance.py | 15 ++++++++++++---
tests/models/test_taskinstance.py | 39 +++++++++++++++++++++++++++++++++++++++
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 41fa661..b99fa34 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1390,18 +1390,27 @@ class TaskInstance(Base, LoggingMixin):
if task.on_failure_callback is not None:
context = self.get_template_context()
context["exception"] = error
- task.on_failure_callback(context)
+ try:
+ task.on_failure_callback(context)
+ except Exception:
+ self.log.exception("Error when executing on_failure_callback")
elif self.state == State.SUCCESS:
task = self.task
if task.on_success_callback is not None:
context = self.get_template_context()
- task.on_success_callback(context)
+ try:
+ task.on_success_callback(context)
+ except Exception:
+ self.log.exception("Error when executing on_success_callback")
elif self.state == State.UP_FOR_RETRY:
task = self.task
if task.on_retry_callback is not None:
context = self.get_template_context()
context["exception"] = error
- task.on_retry_callback(context)
+ try:
+ task.on_retry_callback(context)
+ except Exception:
+ self.log.exception("Error when executing on_retry_callback")
@provide_session
def run(
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 63f0479..021809b 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1657,6 +1657,45 @@ class TestTaskInstance(unittest.TestCase):
ti.refresh_from_db()
assert ti.state == State.SUCCESS
+ @parameterized.expand(
+ [
+ (State.SUCCESS, "Error when executing on_success_callback"),
+ (State.UP_FOR_RETRY, "Error when executing on_retry_callback"),
+ (State.FAILED, "Error when executing on_failure_callback"),
+ ]
+ )
+ def test_finished_callbacks_handle_and_log_exception(self, finished_state, expected_message):
+ called = completed = False
+
+ def on_finish_callable(context):
+ nonlocal called, completed
+ called = True
+ raise KeyError
+ completed = True
+
+ dag = DAG(
+ 'test_success_callback_handles_exception',
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ )
+ task = DummyOperator(
+ task_id='op',
+ email='test@test.test',
+ on_success_callback=on_finish_callable,
+ on_retry_callback=on_finish_callable,
+ on_failure_callback=on_finish_callable,
+ dag=dag,
+ )
+
+ ti = TI(task=task, execution_date=datetime.datetime.now())
+ ti._log = mock.Mock()
+ ti.state = finished_state
+ ti._run_finished_callback()
+
+ assert called
+ assert not completed
+ ti.log.exception.assert_called_once_with(expected_message)
+
def test_handle_failure(self):
start_date = timezone.datetime(2016, 6, 1)
dag = models.DAG(dag_id="test_handle_failure", schedule_interval=None, start_date=start_date)