You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/10 07:00:48 UTC
[airflow] branch main updated: Refactor ti clear next method kwargs tests (#19194)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b5adaf Refactor ti clear next method kwargs tests (#19194)
3b5adaf is described below
commit 3b5adaff9a30bee3ee12b32c44f38c3f5148df24
Author: Rocco Pascale <75...@users.noreply.github.com>
AuthorDate: Mon Jan 10 02:00:04 2022 -0500
Refactor ti clear next method kwargs tests (#19194)
---
tests/models/test_taskinstance.py | 55 +++++++++++++--------------------------
1 file changed, 18 insertions(+), 37 deletions(-)
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 14a8c78..2d9bcd6 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -484,50 +484,31 @@ class TestTaskInstance:
ti.state == state
@pytest.mark.parametrize(
- "state",
- [State.FAILED, State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE, State.UP_FOR_RETRY],
+ "state, exception, retries",
+ [
+ (State.FAILED, AirflowException, 0),
+ (State.SKIPPED, AirflowSkipException, 0),
+ (State.SUCCESS, None, 0),
+ (State.UP_FOR_RESCHEDULE, AirflowRescheduleException(timezone.utcnow()), 0),
+ (State.UP_FOR_RETRY, AirflowException, 1),
+ ],
)
- def test_task_wipes_next_fields(self, session, state, dag_maker):
+ def test_task_wipes_next_fields(self, session, dag_maker, state, exception, retries):
"""
Test that ensures that tasks wipe their next_method and next_kwargs
- when they go into a state of FAILED, SKIPPED, SUCCESS, UP_FOR_RESCHEDULE, or UP_FOR_RETRY.
+ when the TI enters one of the configured states.
"""
- def failure():
- raise AirflowException
-
- def skip():
- raise AirflowSkipException
-
- def success():
- return None
-
- def reschedule():
- reschedule_date = timezone.utcnow()
- raise AirflowRescheduleException(reschedule_date)
-
- _retries = 0
- _retry_delay = datetime.timedelta(seconds=0)
-
- if state == State.FAILED:
- _python_callable = failure
- elif state == State.SKIPPED:
- _python_callable = skip
- elif state == State.SUCCESS:
- _python_callable = success
- elif state == State.UP_FOR_RESCHEDULE:
- _python_callable = reschedule
- elif state in [State.FAILED, State.UP_FOR_RETRY]:
- _python_callable = failure
- _retries = 1
- _retry_delay = datetime.timedelta(seconds=2)
+ def _raise_if_exception():
+ if exception:
+ raise exception
with dag_maker("test_deferred_method_clear"):
task = PythonOperator(
task_id="test_deferred_method_clear_task",
- python_callable=_python_callable,
- retries=_retries,
- retry_delay=_retry_delay,
+ python_callable=_raise_if_exception,
+ retries=retries,
+ retry_delay=datetime.timedelta(seconds=2),
)
dr = dag_maker.create_dagrun()
@@ -539,9 +520,9 @@ class TestTaskInstance:
ti.task = task
if state in [State.FAILED, State.UP_FOR_RETRY]:
- with pytest.raises(AirflowException):
+ with pytest.raises(exception):
ti.run()
- elif state in [State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE]:
+ else:
ti.run()
ti.refresh_from_db()