You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/08/16 15:35:15 UTC
[airflow] 08/11: Clear next method when clearing TIs (#23929)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to tag v2.3.3+astro.2
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b1458570af44e01bc4a6a2a532e8512e0b5e9f45
Author: Tanel Kiis <ta...@users.noreply.github.com>
AuthorDate: Tue Jun 28 12:40:34 2022 +0300
Clear next method when clearing TIs (#23929)
(cherry picked from commit a5ef7a02e12071aac5d19a2a0792603c63b65adf)
---
airflow/models/taskinstance.py | 1 +
tests/models/test_cleartasks.py | 23 +++++++++++++++++++++++
2 files changed, 24 insertions(+)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index fe3387ecf0..debd0aa6b0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -229,6 +229,7 @@ def clear_task_instances(
ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
ti.state = None
ti.external_executor_id = None
+ ti.clear_next_method_args()
session.merge(ti)
task_id_by_key[ti.dag_id][ti.run_id][ti.map_index][ti.try_number].add(ti.task_id)
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index ba08692d42..05ff9df458 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -106,6 +106,29 @@ class TestClearTasks:
assert ti0.state is None
assert ti0.external_executor_id is None
+ def test_clear_task_instances_next_method(self, dag_maker, session):
+ with dag_maker(
+ 'test_clear_task_instances_next_method',
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ ) as dag:
+ EmptyOperator(task_id='task0')
+
+ ti0 = dag_maker.create_dagrun().task_instances[0]
+ ti0.state = State.DEFERRED
+ ti0.next_method = "next_method"
+ ti0.next_kwargs = {}
+
+ session.add(ti0)
+ session.commit()
+
+ clear_task_instances([ti0], session, dag=dag)
+
+ ti0.refresh_from_db()
+
+ assert ti0.next_method is None
+ assert ti0.next_kwargs is None
+
@pytest.mark.parametrize(
["state", "last_scheduling"], [(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)]
)