You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 19:06:27 UTC

[GitHub] [airflow] ashb commented on pull request #22794: Give up on trying to recreate task_id logic

ashb commented on PR #22794:
URL: https://github.com/apache/airflow/pull/22794#issuecomment-1090651804

   A test for you
   
   ```diff
   diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
   index 38be50b12..5311cb6ed 100644
   --- a/tests/models/test_taskinstance.py
   +++ b/tests/models/test_taskinstance.py
   @@ -23,6 +23,7 @@ import pathlib
    import signal
    import sys
    import urllib
   +from contextlib import suppress
    from tempfile import NamedTemporaryFile
    from traceback import format_exception
    from typing import List, Optional, Union, cast
   @@ -55,6 +56,7 @@ from airflow.models import (
        Variable,
        XCom,
    )
   +from airflow.models.taskfail import TaskFail
    from airflow.models.taskinstance import TaskInstance, load_error_file, set_error_file
    from airflow.models.taskmap import TaskMap
    from airflow.models.xcom import XCOM_RETURN_KEY
   @@ -1358,6 +1360,39 @@ class TestTaskInstance:
            assert 'template: test_email_alert_with_config' == title
            assert 'template: test_email_alert_with_config' == body
    
   +    @patch('airflow.models.taskinstance.send_email')
   +    def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session):
   +        with dag_maker(dag_id='test_failure_email', session=session) as dag:
   +
   +            @dag.task(email='to')
   +            def test_email_alert(x):
   +                raise RuntimeError("Fail please")
   +
   +            test_email_alert.expand(x=[1, 2, 3])
   +        ti = sorted(
   +            dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances,
   +            key=lambda ti: ti.map_index,
   +        )[0]
   +        assert ti.map_index == 0
   +
   +        with suppress(RuntimeError):
   +            ti.run(session=session)
   +
   +        (email, title, body), _ = mock_send_email.call_args
   +        assert email == 'to'
   +        assert 'test_email_alert' in title
   +        assert 'test_email_alert__1' not in title
   +        assert 'map_index=0' in title
   +        assert 'test_email_alert' in body
   +        assert 'Try 1' in body
   +
   +        tf = (
   +            session.query(TaskFail)
   +            .filter_by(dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id, map_index=ti.map_index)
   +            .one()
   +        )
   +        assert tf, "TaskFail was recorded"
   +
        def test_set_duration(self):
            task = DummyOperator(task_id='op', email='test@test.test')
            ti = TI(task=task)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org