You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/08/16 15:35:15 UTC

[airflow] 08/11: Clear next method when clearing TIs (#23929)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to tag v2.3.3+astro.2
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b1458570af44e01bc4a6a2a532e8512e0b5e9f45
Author: Tanel Kiis <ta...@users.noreply.github.com>
AuthorDate: Tue Jun 28 12:40:34 2022 +0300

    Clear next method when clearing TIs (#23929)
    
    (cherry picked from commit a5ef7a02e12071aac5d19a2a0792603c63b65adf)
---
 airflow/models/taskinstance.py  |  1 +
 tests/models/test_cleartasks.py | 23 +++++++++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index fe3387ecf0..debd0aa6b0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -229,6 +229,7 @@ def clear_task_instances(
                 ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
             ti.state = None
             ti.external_executor_id = None
+            ti.clear_next_method_args()
             session.merge(ti)
 
         task_id_by_key[ti.dag_id][ti.run_id][ti.map_index][ti.try_number].add(ti.task_id)
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index ba08692d42..05ff9df458 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -106,6 +106,29 @@ class TestClearTasks:
             assert ti0.state is None
             assert ti0.external_executor_id is None
 
+    def test_clear_task_instances_next_method(self, dag_maker, session):
+        with dag_maker(
+            'test_clear_task_instances_next_method',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        ) as dag:
+            EmptyOperator(task_id='task0')
+
+        ti0 = dag_maker.create_dagrun().task_instances[0]
+        ti0.state = State.DEFERRED
+        ti0.next_method = "next_method"
+        ti0.next_kwargs = {}
+
+        session.add(ti0)
+        session.commit()
+
+        clear_task_instances([ti0], session, dag=dag)
+
+        ti0.refresh_from_db()
+
+        assert ti0.next_method is None
+        assert ti0.next_kwargs is None
+
     @pytest.mark.parametrize(
         ["state", "last_scheduling"], [(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)]
     )