You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:21:35 UTC

[GitHub] stale[bot] closed pull request #2444: [ARIFLOW-1414] Add support for retriggering dependent workflows

stale[bot] closed pull request #2444: [ARIFLOW-1414] Add support for retriggering dependent workflows
URL: https://github.com/apache/incubator-airflow/pull/2444
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index 32ad144a22..5b5f27506f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3055,6 +3055,34 @@ def latest_execution_date(self):
         session.close()
         return execution_date
 
+    def descendants(self, dagbag, task_ids=None, include_downstream=False,
+                    include_upstream=False, recursive=False):
+        from airflow.operators.sensors import ExternalTaskSensor
+        if not task_ids:
+            task_ids = self.task_ids
+        descendants = []
+        for _, dag in dagbag.dags.items():
+            tasks = [task for task in dag.tasks if
+                     isinstance(task, ExternalTaskSensor) and
+                     task.external_dag_id == self.dag_id and
+                     task.external_task_id in task_ids]
+            if not tasks:
+                continue
+            task_regex = "|".join(map(
+                lambda x: "^{0}$".format(x.task_id), tasks))
+            dependent_dag = dag.sub_dag(
+                task_regex=r"{0}".format(task_regex),
+                include_downstream=include_downstream,
+                include_upstream=include_upstream)
+            descendants.append(dependent_dag)
+            if recursive:
+                descendants.extend(dependent_dag.descendants(
+                    dagbag,
+                    include_downstream=include_downstream,
+                    include_upstream=include_upstream,
+                    recursive=recursive))
+        return descendants
+
     @property
     def subdags(self):
         """
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 706ed329d5..a75039608a 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -188,6 +188,10 @@ <h4 class="modal-title" id="myModalLabel">
               type="button" class="btn active" data-toggle="button">
                Recursive
             </button>
+            <button id="btn_descendants"
+              type="button" class="btn" data-toggle="button">
+               Descendents
+            </button>
           </span>
           <hr/>
           <button id="btn_success" type="button" class="btn btn-primary">
@@ -362,6 +366,7 @@ <h4 class="modal-title" id="dagModalLabel">
         "&upstream=" + $('#btn_upstream').hasClass('active') +
         "&downstream=" + $('#btn_downstream').hasClass('active') +
         "&recursive=" + $('#btn_recursive').hasClass('active') +
+        "&descendants=" + $('#btn_descendants').hasClass('active') +
         "&execution_date=" + execution_date +
         "&origin=" + encodeURIComponent(window.location);
       window.location = url;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 6c39462072..54d8b44994 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1011,22 +1011,26 @@ def trigger(self):
             "it should start any moment now.".format(dag_id))
         return redirect(origin)
 
-    def _clear_dag_tis(self, dag, start_date, end_date, origin,
+    def _clear_dag_tis(self, dags, start_date, end_date, origin,
                        recursive=False, confirmed=False):
         if confirmed:
-            count = dag.clear(
-                start_date=start_date,
-                end_date=end_date,
-                include_subdags=recursive)
+            count = 0
+            for dag in dags:
+                count += dag.clear(
+                    start_date=start_date,
+                    end_date=end_date,
+                    include_subdags=recursive)
 
             flash("{0} task instances have been cleared".format(count))
             return redirect(origin)
 
-        tis = dag.clear(
-            start_date=start_date,
-            end_date=end_date,
-            include_subdags=recursive,
-            dry_run=True)
+        tis = []
+        for dag in dags:
+            tis.extend(dag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                include_subdags=recursive,
+                dry_run=True))
         if not tis:
             flash("No task instances to clear", 'error')
             response = redirect(origin)
@@ -1059,16 +1063,22 @@ def clear(self):
         future = request.args.get('future') == "true"
         past = request.args.get('past') == "true"
         recursive = request.args.get('recursive') == "true"
+        descendants = request.args.get('descendants') == "true"
 
-        dag = dag.sub_dag(
+        dags = [dag.sub_dag(
             task_regex=r"^{0}$".format(task_id),
             include_downstream=downstream,
-            include_upstream=upstream)
+            include_upstream=upstream)]
+
+        if descendants:
+            dags.extend(dag.descendants(
+                dagbag, task_ids=[task_id], include_downstream=downstream,
+                include_upstream=upstream, recursive=recursive))
 
         end_date = execution_date if not future else None
         start_date = execution_date if not past else None
 
-        return self._clear_dag_tis(dag, start_date, end_date, origin,
+        return self._clear_dag_tis(dags, start_date, end_date, origin,
                                    recursive=recursive, confirmed=confirmed)
 
     @expose('/dagrun_clear')
@@ -1081,13 +1091,19 @@ def dagrun_clear(self):
         origin = request.args.get('origin')
         execution_date = request.args.get('execution_date')
         confirmed = request.args.get('confirmed') == "true"
+        descendants = request.args.get('descendants') == "true"
 
         dag = dagbag.get_dag(dag_id)
+        dags = [dag]
+
+        if descendants:
+            dags.extend(dag.descendants(dagbag, task_ids=[task_id], recursive=True))
+
         execution_date = dateutil.parser.parse(execution_date)
         start_date = execution_date
         end_date = execution_date
 
-        return self._clear_dag_tis(dag, start_date, end_date, origin,
+        return self._clear_dag_tis(dags, start_date, end_date, origin,
                                    recursive=True, confirmed=confirmed)
 
     @expose('/blocked')
diff --git a/tests/core.py b/tests/core.py
index 259b61da95..5a5b47a8b1 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1795,6 +1795,52 @@ def test_dag_views(self):
         response = self.app.get("/admin/xcom", follow_redirects=True)
         self.assertIn("Xcoms", response.data.decode('utf-8'))
 
+    def test_dag_clear_view_with_descendants(self):
+        def _run_dag(dag):
+            dag.clear()
+            dag.run(start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE + timedelta(seconds=1))
+
+        def _parse_view(html):
+            # If the way task instances are serialized changes (repr(TaskInstance))
+            # this regexp won't work anymore but I cannot find another way to
+            # parse the html
+            pattern = r"TaskInstance:\s+(\w+)\.(\w+)\s+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[\w+\]"
+            return re.findall(pattern, html)
+
+        dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
+        dag_core_id = TEST_DAG_ID + '_core'
+        dag_first_child_id = TEST_DAG_ID + '_first_child'
+        dag_second_child_id = TEST_DAG_ID + '_second_child'
+
+        dag_core = dagbag.get_dag(dag_core_id)
+        dag_first_child = dagbag.get_dag(dag_first_child_id)
+        dag_second_child = dagbag.get_dag(dag_second_child_id)
+
+        all_dags = [dag_core, dag_first_child, dag_second_child]
+        all_dag_ids = [dag.dag_id for dag in all_dags]
+        all_task_ids = [task_id for dag in all_dags for task_id in dag.task_ids]
+
+        for dag in all_dags:
+            _run_dag(dag)
+        to_rerun_task_ids = [ task_id for task_id in all_task_ids if task_id != 't2_second_child']
+
+        session = settings.Session()
+        TI = models.TaskInstance
+        tis = session.query(TI).filter(TI.dag_id.in_(all_dag_ids),
+            TI.task_id.in_(to_rerun_task_ids)).all()
+        tis = [(ti.dag_id, ti.task_id, str(ti.execution_date)) for ti in tis]
+        url = (
+            "/admin/airflow/clear?task_id=task_core&"
+            "dag_id={}&future=true&past=false&"
+            "upstream=false&downstream=true&recursive=true&"
+            "descendants=true&execution_date={}&"
+            "origin=/admin".format(dag_core_id, DEFAULT_DATE))
+        response = self.app.get(url)
+        html = response.data.decode('utf-8')
+        tasks = _parse_view(html)
+        self.assertEqual(sorted(tis), sorted(tasks))
+
     def test_charts(self):
         session = Session()
         chart_label = "Airflow task instance by type"
diff --git a/tests/dags/test_descendant_dags.py b/tests/dags/test_descendant_dags.py
new file mode 100644
index 0000000000..88116d8161
--- /dev/null
+++ b/tests/dags/test_descendant_dags.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.sensors import ExternalTaskSensor
+from tests.core import TEST_DAG_ID, DEFAULT_DATE
+from datetime import timedelta
+
+args = {'owner': 'airflow', 'start_date': DEFAULT_DATE,
+        'depends_on_past': False, 'retries': 3}
+
+dag_core_id = TEST_DAG_ID + '_core'
+dag_core = DAG(dag_core_id, default_args=args,
+    schedule_interval=timedelta(seconds=1))
+task_core = DummyOperator(
+    task_id='task_core',
+    dag=dag_core)
+
+dag_first_child_id = TEST_DAG_ID + '_first_child'
+dag_first_child = DAG(dag_first_child_id, default_args=args,
+    schedule_interval=timedelta(seconds=1))
+t1_first_child = ExternalTaskSensor(
+    task_id='t1_first_child',
+    external_dag_id=dag_core_id,
+    external_task_id='task_core',
+    poke_interval=1,
+    dag=dag_first_child,
+    depends_on_past=True)
+t2_first_child = DummyOperator(
+    task_id='t2_first_child',
+    dag=dag_first_child,
+    depends_on_past=True)
+t2_first_child.set_upstream(t1_first_child)
+
+dag_second_child_id = TEST_DAG_ID + '_second_child'
+dag_second_child = DAG(dag_second_child_id, default_args=args,
+    schedule_interval=timedelta(seconds=1))
+t1_second_child = ExternalTaskSensor(
+    task_id='t1_second_child',
+    external_dag_id=dag_first_child_id,
+    external_task_id='t2_first_child',
+    poke_interval=1,
+    dag=dag_second_child,
+    depends_on_past=True)
+t2_second_child = DummyOperator(
+    task_id='t2_second_child',
+    dag=dag_second_child)
diff --git a/tests/models.py b/tests/models.py
index 400c659a1e..2938ab5699 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -42,7 +42,7 @@
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')
-
+TEST_DAG_ID = 'unit_tests'
 
 class DagTest(unittest.TestCase):
 
@@ -289,6 +289,48 @@ def jinja_udf(name):
         result = task.render_template('', "{{ 'world' | hello}}", dict())
         self.assertEqual(result, 'Hello world')
 
+    def test_descendants(self):
+        dagbag = models.DagBag(dag_folder=TEST_DAGS_FOLDER)
+        dag_core_id = TEST_DAG_ID + '_core'
+        dag_first_child = TEST_DAG_ID + '_first_child'
+        dag_core = dagbag.get_dag(dag_core_id)
+        dag_first_child = dagbag.get_dag(dag_first_child)
+        descendants = dag_core.descendants(dagbag)
+        self.assertEqual(len(descendants), 1)
+        self.assertEqual(descendants[0], dag_first_child.sub_dag(task_regex=r"^t1_first_child$"))
+
+    def test_descendants_upstream(self):
+        dagbag = models.DagBag(dag_folder=TEST_DAGS_FOLDER)
+        dag_core_id = TEST_DAG_ID + '_core'
+        dag_first_child = TEST_DAG_ID + '_first_child'
+        dag_core = dagbag.get_dag(dag_core_id)
+        dag_first_child = dagbag.get_dag(dag_first_child)
+        descendants = dag_core.descendants(dagbag, include_upstream=True)
+        self.assertEqual(len(descendants), 1)
+        self.assertEqual(descendants[0], dag_first_child.sub_dag(task_regex=r"^t1_first_child$"))
+
+    def test_descendants_downstream(self):
+        dagbag = models.DagBag(dag_folder=TEST_DAGS_FOLDER)
+        dag_core_id = TEST_DAG_ID + '_core'
+        dag_first_child = TEST_DAG_ID + '_first_child'
+        dag_core = dagbag.get_dag(dag_core_id)
+        dag_first_child = dagbag.get_dag(dag_first_child)
+        descendants = dag_core.descendants(dagbag, 'task_core', include_downstream=True)
+        self.assertEqual(len(descendants), 1)
+        self.assertEqual(descendants[0], dag_first_child.sub_dag(task_regex=r"^(t1_first_child|t2_first_child)$"))
+
+    def test_descendants_downstream_recursive(self):
+        dagbag = models.DagBag(dag_folder=TEST_DAGS_FOLDER)
+        dag_core_id = TEST_DAG_ID + '_core'
+        dag_first_child = TEST_DAG_ID + '_first_child'
+        dag_second_child = TEST_DAG_ID + '_second_child'
+        dag_core = dagbag.get_dag(dag_core_id)
+        dag_first_child = dagbag.get_dag(dag_first_child)
+        dag_second_child = dagbag.get_dag(dag_second_child)
+        descendants = dag_core.descendants(dagbag, include_downstream=True, recursive=True)
+        self.assertEqual(len(descendants), 2)
+        self.assertEqual(descendants[0], dag_first_child.sub_dag(task_regex=r"^(t1_first_child|t2_first_child)$"))
+        self.assertEqual(descendants[1], dag_second_child.sub_dag(task_regex=r"^t1_second_child$"))
 
 class DagStatTest(unittest.TestCase):
     def test_dagstats_crud(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services