You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 19:06:27 UTC
[GitHub] [airflow] ashb commented on pull request #22794: Give up on trying to recreate task_id logic
ashb commented on PR #22794:
URL: https://github.com/apache/airflow/pull/22794#issuecomment-1090651804
A test for you
```diff
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 38be50b12..5311cb6ed 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -23,6 +23,7 @@ import pathlib
import signal
import sys
import urllib
+from contextlib import suppress
from tempfile import NamedTemporaryFile
from traceback import format_exception
from typing import List, Optional, Union, cast
@@ -55,6 +56,7 @@ from airflow.models import (
Variable,
XCom,
)
+from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance, load_error_file, set_error_file
from airflow.models.taskmap import TaskMap
from airflow.models.xcom import XCOM_RETURN_KEY
@@ -1358,6 +1360,39 @@ class TestTaskInstance:
assert 'template: test_email_alert_with_config' == title
assert 'template: test_email_alert_with_config' == body
+ @patch('airflow.models.taskinstance.send_email')
+ def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session):
+ with dag_maker(dag_id='test_failure_email', session=session) as dag:
+
+ @dag.task(email='to')
+ def test_email_alert(x):
+ raise RuntimeError("Fail please")
+
+ test_email_alert.expand(x=[1, 2, 3])
+ ti = sorted(
+ dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances,
+ key=lambda ti: ti.map_index,
+ )[0]
+ assert ti.map_index == 0
+
+ with suppress(RuntimeError):
+ ti.run(session=session)
+
+ (email, title, body), _ = mock_send_email.call_args
+ assert email == 'to'
+ assert 'test_email_alert' in title
+ assert 'test_email_alert__1' not in title
+ assert 'map_index=0' in title
+ assert 'test_email_alert' in body
+ assert 'Try 1' in body
+
+ tf = (
+ session.query(TaskFail)
+ .filter_by(dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id, map_index=ti.map_index)
+ .one()
+ )
+ assert tf, "TaskFail was recorded"
+
def test_set_duration(self):
task = DummyOperator(task_id='op', email='test@test.test')
ti = TI(task=task)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org