You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:33:09 UTC

[airflow] 34/47: Fix clear future recursive when ExternalTaskMarker is used (#9515)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit da2c957b93db60d6bf6bc719b61d69f99803ddf2
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Sat Aug 15 06:39:57 2020 +0800

    Fix clear future recursive when ExternalTaskMarker is used (#9515)
    
    (cherry picked from commit 4454224b682e07a641f1a8878197170c167de03c)
---
 airflow/models/dag.py                      |  2 +-
 tests/sensors/test_external_task_sensor.py | 28 ++++++++++++++++++++++++++--
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 94c6d2e..de610e2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1055,7 +1055,7 @@ class DAG(BaseDag, LoggingMixin):
             instances = tis.all()
             for ti in instances:
                 if ti.operator == ExternalTaskMarker.__name__:
-                    ti.task = self.get_task(ti.task_id)
+                    ti.task = copy.copy(self.get_task(ti.task_id))
 
                     if recursion_depth == 0:
                         # Maximum recursion depth allowed is the recursion_depth of the first
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 0e5e960..e2a58ec 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -433,12 +433,12 @@ def assert_ti_state_equal(task_instance, state):
     assert task_instance.state == state
 
 
-def clear_tasks(dag_bag, dag, task):
+def clear_tasks(dag_bag, dag, task, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE):
     """
     Clear the task and its downstream tasks recursively for the dag in the given dagbag.
     """
     subdag = dag.sub_dag(task_regex="^{}$".format(task.task_id), include_downstream=True)
-    subdag.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, dag_bag=dag_bag)
+    subdag.clear(start_date=start_date, end_date=end_date, dag_bag=dag_bag)
 
 
 # pylint: disable=redefined-outer-name
@@ -456,6 +456,30 @@ def test_external_task_marker_transitive(dag_bag_ext):
     assert_ti_state_equal(ti_b_3, State.NONE)
 
 
+def test_external_task_marker_future(dag_bag_ext):
+    """
+    Test clearing tasks with no end_date. This is the case when users clear tasks with
+    Future, Downstream and Recursive selected.
+    """
+    date_0 = DEFAULT_DATE
+    date_1 = DEFAULT_DATE + timedelta(days=1)
+
+    tis_date_0 = run_tasks(dag_bag_ext, execution_date=date_0)
+    tis_date_1 = run_tasks(dag_bag_ext, execution_date=date_1)
+
+    dag_0 = dag_bag_ext.get_dag("dag_0")
+    task_a_0 = dag_0.get_task("task_a_0")
+    # This should clear all tasks on dag_0 to dag_3 on both date_0 and date_1
+    clear_tasks(dag_bag_ext, dag_0, task_a_0, end_date=None)
+
+    ti_a_0_date_0 = tis_date_0["task_a_0"]
+    ti_b_3_date_0 = tis_date_0["task_b_3"]
+    ti_b_3_date_1 = tis_date_1["task_b_3"]
+    assert_ti_state_equal(ti_a_0_date_0, State.NONE)
+    assert_ti_state_equal(ti_b_3_date_0, State.NONE)
+    assert_ti_state_equal(ti_b_3_date_1, State.NONE)
+
+
 def test_external_task_marker_exception(dag_bag_ext):
     """
     Clearing across multiple DAGs should raise AirflowException if more levels are being cleared