You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/10 07:00:48 UTC

[airflow] branch main updated: Refactor ti clear next method kwargs tests (#19194)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b5adaf  Refactor ti clear next method kwargs tests (#19194)
3b5adaf is described below

commit 3b5adaff9a30bee3ee12b32c44f38c3f5148df24
Author: Rocco Pascale <75...@users.noreply.github.com>
AuthorDate: Mon Jan 10 02:00:04 2022 -0500

    Refactor ti clear next method kwargs tests (#19194)
---
 tests/models/test_taskinstance.py | 55 +++++++++++++--------------------------
 1 file changed, 18 insertions(+), 37 deletions(-)

diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 14a8c78..2d9bcd6 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -484,50 +484,31 @@ class TestTaskInstance:
         ti.state == state
 
     @pytest.mark.parametrize(
-        "state",
-        [State.FAILED, State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE, State.UP_FOR_RETRY],
+        "state, exception, retries",
+        [
+            (State.FAILED, AirflowException, 0),
+            (State.SKIPPED, AirflowSkipException, 0),
+            (State.SUCCESS, None, 0),
+            (State.UP_FOR_RESCHEDULE, AirflowRescheduleException(timezone.utcnow()), 0),
+            (State.UP_FOR_RETRY, AirflowException, 1),
+        ],
     )
-    def test_task_wipes_next_fields(self, session, state, dag_maker):
+    def test_task_wipes_next_fields(self, session, dag_maker, state, exception, retries):
         """
         Test that ensures that tasks wipe their next_method and next_kwargs
-        when they go into a state of FAILED, SKIPPED, SUCCESS, UP_FOR_RESCHEDULE, or UP_FOR_RETRY.
+        when the TI enters one of the configured states.
         """
 
-        def failure():
-            raise AirflowException
-
-        def skip():
-            raise AirflowSkipException
-
-        def success():
-            return None
-
-        def reschedule():
-            reschedule_date = timezone.utcnow()
-            raise AirflowRescheduleException(reschedule_date)
-
-        _retries = 0
-        _retry_delay = datetime.timedelta(seconds=0)
-
-        if state == State.FAILED:
-            _python_callable = failure
-        elif state == State.SKIPPED:
-            _python_callable = skip
-        elif state == State.SUCCESS:
-            _python_callable = success
-        elif state == State.UP_FOR_RESCHEDULE:
-            _python_callable = reschedule
-        elif state in [State.FAILED, State.UP_FOR_RETRY]:
-            _python_callable = failure
-            _retries = 1
-            _retry_delay = datetime.timedelta(seconds=2)
+        def _raise_if_exception():
+            if exception:
+                raise exception
 
         with dag_maker("test_deferred_method_clear"):
             task = PythonOperator(
                 task_id="test_deferred_method_clear_task",
-                python_callable=_python_callable,
-                retries=_retries,
-                retry_delay=_retry_delay,
+                python_callable=_raise_if_exception,
+                retries=retries,
+                retry_delay=datetime.timedelta(seconds=2),
             )
 
         dr = dag_maker.create_dagrun()
@@ -539,9 +520,9 @@ class TestTaskInstance:
 
         ti.task = task
         if state in [State.FAILED, State.UP_FOR_RETRY]:
-            with pytest.raises(AirflowException):
+            with pytest.raises(exception):
                 ti.run()
-        elif state in [State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE]:
+        else:
             ti.run()
         ti.refresh_from_db()