You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/17 19:10:06 UTC

[GitHub] kaxil closed pull request #3907: [AIRFLOW-1195] Add feature to clear tasks in Parent Dag

kaxil closed pull request #3907: [AIRFLOW-1195] Add feature to clear tasks in Parent Dag
URL: https://github.com/apache/incubator-airflow/pull/3907
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index fd8765588a..fb9ddbe2b0 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -705,7 +705,9 @@ def clear(args):
         only_failed=args.only_failed,
         only_running=args.only_running,
         confirm_prompt=not args.no_confirm,
-        include_subdags=not args.exclude_subdags)
+        include_subdags=not args.exclude_subdags,
+        include_parentdag=not args.exclude_parentdag,
+    )
 
 
 def get_num_ready_workers_running(gunicorn_master_proc):
@@ -1604,6 +1606,10 @@ class CLIFactory(object):
         'exclude_subdags': Arg(
             ("-x", "--exclude_subdags"),
             "Exclude subdags", "store_true"),
+        'exclude_parentdag': Arg(
+            ("-xp", "--exclude_parentdag"),
+            "Exclude ParentDAGS if the task cleared is a part of a SubDAG",
+            "store_true"),
         'dag_regex': Arg(
             ("-dx", "--dag_regex"),
             "Search dag_id as regex instead of exact string", "store_true"),
@@ -1936,7 +1942,7 @@ class CLIFactory(object):
             'args': (
                 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
                 'upstream', 'downstream', 'no_confirm', 'only_failed',
-                'only_running', 'exclude_subdags', 'dag_regex'),
+                'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'),
         }, {
             'func': pause,
             'help': "Pause a DAG",
diff --git a/airflow/models.py b/airflow/models.py
index d703810a77..1e4949e563 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3798,9 +3798,11 @@ def clear(
             only_running=False,
             confirm_prompt=False,
             include_subdags=True,
+            include_parentdag=True,
             reset_dag_runs=True,
             dry_run=False,
             session=None,
+            get_tis=False,
     ):
         """
         Clears a set of task instances associated with the current dag for
@@ -3821,6 +3823,25 @@ def clear(
             tis = session.query(TI).filter(TI.dag_id == self.dag_id)
             tis = tis.filter(TI.task_id.in_(self.task_ids))
 
+        if include_parentdag and self.is_subdag:
+
+            p_dag = self.parent_dag.sub_dag(
+                task_regex=self.dag_id.split('.')[1],
+                include_upstream=False,
+                include_downstream=True)
+
+            tis = tis.union(p_dag.clear(
+                start_date=start_date, end_date=end_date,
+                only_failed=only_failed,
+                only_running=only_running,
+                confirm_prompt=confirm_prompt,
+                include_subdags=include_subdags,
+                include_parentdag=False,
+                reset_dag_runs=reset_dag_runs,
+                get_tis=True,
+                session=session,
+            ))
+
         if start_date:
             tis = tis.filter(TI.execution_date >= start_date)
         if end_date:
@@ -3832,6 +3853,9 @@ def clear(
         if only_running:
             tis = tis.filter(TI.state == State.RUNNING)
 
+        if get_tis:
+            return tis
+
         if dry_run:
             tis = tis.all()
             session.expunge_all()
@@ -3875,6 +3899,7 @@ def clear_dags(
             only_running=False,
             confirm_prompt=False,
             include_subdags=True,
+            include_parentdag=False,
             reset_dag_runs=True,
             dry_run=False,
     ):
@@ -3887,6 +3912,7 @@ def clear_dags(
                 only_running=only_running,
                 confirm_prompt=False,
                 include_subdags=include_subdags,
+                include_parentdag=include_parentdag,
                 reset_dag_runs=reset_dag_runs,
                 dry_run=True)
             all_tis.extend(tis)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index aa2530e458..be11b11376 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1111,7 +1111,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
             count = dag.clear(
                 start_date=start_date,
                 end_date=end_date,
-                include_subdags=recursive)
+                include_subdags=recursive,
+                include_parentdag=recursive,
+            )
 
             flash("{0} task instances have been cleared".format(count))
             return redirect(origin)
@@ -1120,7 +1122,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
             start_date=start_date,
             end_date=end_date,
             include_subdags=recursive,
-            dry_run=True)
+            dry_run=True,
+            include_parentdag=recursive,
+        )
         if not tis:
             flash("No task instances to clear", 'error')
             response = redirect(origin)
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 3dc3400968..38835998e8 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -837,7 +837,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
             count = dag.clear(
                 start_date=start_date,
                 end_date=end_date,
-                include_subdags=recursive)
+                include_subdags=recursive,
+                include_parentdag=recursive,
+            )
 
             flash("{0} task instances have been cleared".format(count))
             return redirect(origin)
@@ -846,7 +848,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
             start_date=start_date,
             end_date=end_date,
             include_subdags=recursive,
-            dry_run=True)
+            include_parentdag=recursive,
+            dry_run=True,
+        )
         if not tis:
             flash("No task instances to clear", 'error')
             response = redirect(origin)
diff --git a/tests/core.py b/tests/core.py
index a517070614..b937178a9e 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1429,8 +1429,18 @@ def test_subdag_clear(self):
             'clear', 'example_subdag_operator', '--no_confirm', '--exclude_subdags'])
         cli.clear(args)
 
+    def test_parentdag_downstream_clear(self):
+        args = self.parser.parse_args([
+            'clear', 'example_subdag_operator.section-1', '--no_confirm'])
+        cli.clear(args)
+        args = self.parser.parse_args([
+            'clear', 'example_subdag_operator.section-1', '--no_confirm',
+            '--exclude_parentdag'])
+        cli.clear(args)
+
     def test_get_dags(self):
-        dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator', '-c']))
+        dags = cli.get_dags(self.parser.parse_args(['clear', 'example_subdag_operator',
+                                                    '-c']))
         self.assertEqual(len(dags), 1)
 
         dags = cli.get_dags(self.parser.parse_args(['clear', 'subdag', '-dx', '-c']))
@@ -1942,6 +1952,34 @@ def test_dag_views(self):
         response = self.app.get(url)
         self.assertIn("Wait a minute", response.data.decode('utf-8'))
         response = self.app.get(url + "&confirmed=true")
+        url = (
+            "/admin/airflow/clear?task_id=section-1-task-1&"
+            "dag_id=example_subdag_operator.section-1&future=false&past=false&"
+            "upstream=false&downstream=true&recursive=true&"
+            "execution_date={}&"
+            "origin=/admin".format(DEFAULT_DATE_DS))
+        response = self.app.get(url)
+        self.assertIn("Wait a minute", response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.end",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-1.section-1-task-1",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-1",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-1",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-2",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-3",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-4",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-5",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.some-other-task",
+                      response.data.decode('utf-8'))
         url = (
             "/admin/airflow/run?task_id=runme_0&"
             "dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
diff --git a/tests/jobs.py b/tests/jobs.py
index dc3381e8e0..e18a87e6cf 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -901,6 +901,59 @@ def test_backfill_execute_subdag(self):
         subdag.clear()
         dag.clear()
 
+    def test_subdag_clear_parentdag_downstream_clear(self):
+        dag = self.dagbag.get_dag('example_subdag_operator')
+        subdag_op_task = dag.get_task('section-1')
+
+        subdag = subdag_op_task.subdag
+        subdag.schedule_interval = '@daily'
+
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=subdag,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE,
+                          executor=executor,
+                          donot_pickle=True)
+
+        with timeout(seconds=30):
+            job.run()
+
+        ti0 = TI(
+            task=subdag.get_task('section-1-task-1'),
+            execution_date=DEFAULT_DATE)
+        ti0.refresh_from_db()
+        self.assertEqual(ti0.state, State.SUCCESS)
+
+        sdag = subdag.sub_dag(
+            task_regex='section-1-task-1',
+            include_downstream=True,
+            include_upstream=False)
+
+        sdag.clear(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            include_parentdag=True)
+
+        ti0.refresh_from_db()
+        self.assertEquals(State.NONE, ti0.state)
+
+        ti1 = TI(
+            task=dag.get_task('some-other-task'),
+            execution_date=DEFAULT_DATE)
+        self.assertEquals(State.NONE, ti1.state)
+
+        # Checks that all the Downstream tasks for Parent DAG
+        # have been cleared
+        for task in subdag_op_task.downstream_list:
+            ti = TI(
+                task=dag.get_task(task.task_id),
+                execution_date=DEFAULT_DATE
+            )
+            self.assertEquals(State.NONE, ti.state)
+
+        subdag.clear()
+        dag.clear()
+
     def test_backfill_execute_subdag_with_removed_task(self):
         """
         Ensure that subdag operators execute properly in the case where


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services