You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/06/14 02:03:31 UTC

incubator-airflow git commit: [AIRFLOW-936] Add clear/mark success for DAG in the UI

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6e3bcd318 -> 3c450fbe1


[AIRFLOW-936] Add clear/mark success for DAG in the UI

This PR adds a modal popup when clicking circle
DAG icon in Airflow tree view UI. It adds the
functionalities to clear/mark success of the
entire DAG run. This behavior is equivalent to
individually clear/mark each task instance in the
DAG run. The original logic of editing DAG run
page is moved to the button "Edit DAG Run".

Closes #2339 from AllisonWang/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c450fbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c450fbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c450fbe

Branch: refs/heads/master
Commit: 3c450fbe1abad7b76b59b6b3b15c6e29b4ad8d0f
Parents: 6e3bcd3
Author: Allison Wang <al...@gmail.com>
Authored: Tue Jun 13 18:56:41 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Tue Jun 13 18:56:44 2017 -0700

----------------------------------------------------------------------
 airflow/api/common/experimental/mark_tasks.py |  33 +++-
 airflow/www/templates/airflow/dag.html        |  60 +++++++
 airflow/www/templates/airflow/tree.html       |  11 +-
 airflow/www/views.py                          | 113 +++++++++---
 tests/api/common/mark_tasks.py                | 189 ++++++++++++++++++++-
 5 files changed, 372 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/api/common/experimental/mark_tasks.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py
index 0ddbf98..82eb4b5 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -22,7 +22,6 @@ from airflow.utils.state import State
 
 from sqlalchemy import or_
 
-
 def _create_dagruns(dag, execution_dates, state, run_id_template):
     """
     Infers from the dates which dag runs need to be created and does so.
@@ -181,7 +180,39 @@ def set_state(task, execution_date, upstream=False, downstream=False,
         if len(sub_dag_ids) > 0:
             tis_altered += qry_sub_dag.all()
 
+    session.expunge_all()
     session.close()
 
     return tis_altered
 
+def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
+    """
+    Set the state of a dag run and all task instances associated with the dag
+    run for a specific execution date.
+    :param dag: the DAG of which to alter state
+    :param execution_date: the execution date from which to start looking
+    :param state: the state to which the DAG need to be set
+    :param commit: commit DAG and tasks to be altered to the database
+    :return: list of tasks that have been created and updated
+    :raises: AssertionError if dag or execution_date is invalid
+    """
+    res = []
+
+    if not dag or not execution_date:
+        return res
+
+    # Mark all task instances in the dag run
+    for task in dag.tasks:
+        task.dag = dag
+        new_state = set_state(task=task, execution_date=execution_date,
+                              state=state, commit=commit)
+        res.extend(new_state)
+
+    # Mark the dag run
+    if commit:
+        drs = DagRun.find(dag.dag_id, execution_date=execution_date)
+        for dr in drs:
+            dr.dag = dag
+            dr.update_state()
+
+    return res

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/templates/airflow/dag.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index e5a305c..706ed32 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -216,6 +216,36 @@
       </div>
     </div>
   </div>
+  <!-- Modal for dag -->
+  <div class="modal fade" id="dagModal"
+        tabindex="-1" role="dialog"
+      aria-labelledby="dagModalLabel" aria-hidden="true">
+    <div class="modal-dialog">
+      <div class="modal-content">
+        <div class="modal-header">
+          <h4 class="modal-title" id="dagModalLabel">
+            <span id='dag_id'></span>
+          </h4>
+        </div>
+        <div class="modal-body">
+          <button id="btn_edit_dagrun" type="button" class="btn btn-primary">
+            Edit
+          </button>
+          <button id="btn_dagrun_clear" type="button" class="btn btn-primary">
+            Clear
+          </button>
+          <button id="btn_dagrun_success" type="button" class="btn btn-primary">
+            Mark Success
+          </button>
+        </div>
+        <div class="modal-footer">
+          <button type="button" class="btn btn-default" data-dismiss="modal">
+            Close
+          </button>
+        </div>
+      </div>
+    </div>
+  </div>
 {% endblock %}
 {% block tail %}
   {{ lib.form_js() }}
@@ -239,6 +269,7 @@ function updateQueryStringParameter(uri, key, value) {
       $('.never_active').removeClass('active');
     });
 
+    var id = '';
     var dag_id = '{{ dag.dag_id }}';
     var task_id = '';
     var exection_date = '';
@@ -263,6 +294,14 @@ function updateQueryStringParameter(uri, key, value) {
         }
     }
 
+    function call_modal_dag(dag) {
+      id = dag && dag.id;
+      execution_date = dag && dag.execution_date;
+      $('#dag_id').html(dag_id);
+      $('#dagModal').modal({});
+      $("#dagModal").css("margin-top","0px");
+    }
+
     $("#btn_rendered").click(function(){
       url = "{{ url_for('airflow.rendered') }}" +
         "?task_id=" + task_id +
@@ -328,6 +367,15 @@ function updateQueryStringParameter(uri, key, value) {
       window.location = url;
     });
 
+    $("#btn_dagrun_clear").click(function(){
+      url = "{{ url_for('airflow.dagrun_clear') }}" +
+        "?task_id=" + encodeURIComponent(task_id) +
+        "&dag_id=" + encodeURIComponent(dag_id) +
+        "&execution_date=" + execution_date +
+        "&origin=" + encodeURIComponent(window.location);
+      window.location = url;
+    });
+
     $("#btn_success").click(function(){
       url = "{{ url_for('airflow.success') }}" +
         "?task_id=" + encodeURIComponent(task_id) +
@@ -342,6 +390,14 @@ function updateQueryStringParameter(uri, key, value) {
       window.location = url;
     });
 
+    $('#btn_dagrun_success').click(function(){
+      url = "{{ url_for('airflow.dagrun_success') }}" +
+        "?dag_id=" + encodeURIComponent(dag_id) +
+        "&execution_date=" + execution_date +
+        "&origin=" + encodeURIComponent(window.location);
+      window.location = url;
+    });
+
     $("#btn_gantt").click(function(){
       url = "{{ url_for('airflow.gantt') }}" +
         "?dag_id=" + dag_id +
@@ -367,5 +423,9 @@ function updateQueryStringParameter(uri, key, value) {
       $.post(url);
     });
 
+    $('#btn_edit_dagrun').click(function(){
+      window.location = '/admin/dagrun/edit/?id=' + id;
+    });
+
   </script>
 {% endblock %}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/templates/airflow/tree.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/tree.html b/airflow/www/templates/airflow/tree.html
index be3655a..b570fae 100644
--- a/airflow/www/templates/airflow/tree.html
+++ b/airflow/www/templates/airflow/tree.html
@@ -1,13 +1,13 @@
-{# 
+{#
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-  
+
     http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -232,9 +232,8 @@ function update(source) {
       .enter()
       .append('rect')
       .on("click", function(d){
-        if(d.task_id === undefined){
-          window.location = '/admin/dagrun/edit/?id=' + d.id;
-        }
+        if(d.task_id === undefined)
+            call_modal_dag(d);
         else if(nodeobj[d.task_id].operator=='SubDagOperator')
             call_modal(d.task_id, d.execution_date, true);
         else

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b401434..541c3ff 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -58,6 +58,7 @@ import airflow
 from airflow import configuration as conf
 from airflow import models
 from airflow import settings
+from airflow.api.common.experimental.mark_tasks import set_dag_run_state
 from airflow.exceptions import AirflowException
 from airflow.settings import Session
 from airflow.models import XCom, DagRun
@@ -1026,6 +1027,36 @@ class Airflow(BaseView):
             "it should start any moment now.".format(dag_id))
         return redirect(origin)
 
+    def _clear_dag_tis(self, dag, start_date, end_date, origin,
+                       recursive=False, confirmed=False):
+        if confirmed:
+            count = dag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                include_subdags=recursive)
+
+            flash("{0} task instances have been cleared".format(count))
+            return redirect(origin)
+
+        tis = dag.clear(
+            start_date=start_date,
+            end_date=end_date,
+            include_subdags=recursive,
+            dry_run=True)
+        if not tis:
+            flash("No task instances to clear", 'error')
+            response = redirect(origin)
+        else:
+            details = "\n".join([str(t) for t in tis])
+
+            response = self.render(
+                'airflow/confirm.html',
+                message=("Here's the list of task instances you are about "
+                         "to clear:"),
+                details=details)
+
+        return response
+
     @expose('/clear')
     @login_required
     @wwwutils.action_logging
@@ -1052,34 +1083,28 @@ class Airflow(BaseView):
 
         end_date = execution_date if not future else None
         start_date = execution_date if not past else None
-        if confirmed:
-            count = dag.clear(
-                start_date=start_date,
-                end_date=end_date,
-                include_subdags=recursive)
 
-            flash("{0} task instances have been cleared".format(count))
-            return redirect(origin)
-        else:
-            tis = dag.clear(
-                start_date=start_date,
-                end_date=end_date,
-                include_subdags=recursive,
-                dry_run=True)
-            if not tis:
-                flash("No task instances to clear", 'error')
-                response = redirect(origin)
-            else:
-                details = "\n".join([str(t) for t in tis])
+        return self._clear_dag_tis(dag, start_date, end_date, origin,
+                                   recursive=recursive, confirmed=confirmed)
 
-                response = self.render(
-                    'airflow/confirm.html',
-                    message=(
-                        "Here's the list of task instances you are about "
-                        "to clear:"),
-                    details=details,)
+    @expose('/dagrun_clear')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def dagrun_clear(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        execution_date = request.args.get('execution_date')
+        confirmed = request.args.get('confirmed') == "true"
 
-            return response
+        dag = dagbag.get_dag(dag_id)
+        execution_date = dateutil.parser.parse(execution_date)
+        start_date = execution_date
+        end_date = execution_date
+
+        return self._clear_dag_tis(dag, start_date, end_date, origin,
+                                   recursive=True, confirmed=confirmed)
 
     @expose('/blocked')
     @login_required
@@ -1104,6 +1129,44 @@ class Airflow(BaseView):
             })
         return wwwutils.json_response(payload)
 
+    @expose('/dagrun_success')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def dagrun_success(self):
+        dag_id = request.args.get('dag_id')
+        execution_date = request.args.get('execution_date')
+        confirmed = request.args.get('confirmed') == 'true'
+        origin = request.args.get('origin')
+
+        if not execution_date:
+            flash('Invalid execution date', 'error')
+            return redirect(origin)
+
+        execution_date = dateutil.parser.parse(execution_date)
+        dag = dagbag.get_dag(dag_id)
+
+        if not dag:
+            flash('Cannot find DAG: {}'.format(dag_id), 'error')
+            return redirect(origin)
+
+        new_dag_state = set_dag_run_state(dag, execution_date, state=State.SUCCESS,
+                                          commit=confirmed)
+
+        if confirmed:
+            flash('Marked success on {} task instances'.format(len(new_dag_state)))
+            return redirect(origin)
+
+        else:
+            details = '\n'.join([str(t) for t in new_dag_state])
+
+            response = self.render('airflow/confirm.html',
+                                   message=("Here's the list of task instances you are "
+                                            "about to mark as successful:"),
+                                   details=details)
+
+            return response
+
     @expose('/success')
     @login_required
     @wwwutils.action_logging

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/tests/api/common/mark_tasks.py
----------------------------------------------------------------------
diff --git a/tests/api/common/mark_tasks.py b/tests/api/common/mark_tasks.py
index e01f3ad..8a3759f 100644
--- a/tests/api/common/mark_tasks.py
+++ b/tests/api/common/mark_tasks.py
@@ -16,11 +16,12 @@
 import unittest
 
 from airflow import models
-from airflow.api.common.experimental.mark_tasks import set_state, _create_dagruns
+from airflow.api.common.experimental.mark_tasks import (
+    set_state, _create_dagruns, set_dag_run_state)
 from airflow.settings import Session
 from airflow.utils.dates import days_ago
 from airflow.utils.state import State
-
+from datetime import datetime, timedelta
 
 DEV_NULL = "/dev/null"
 
@@ -207,5 +208,189 @@ class TestMarkTasks(unittest.TestCase):
 
         self.session.close()
 
+class TestMarkDAGRun(unittest.TestCase):
+    def setUp(self):
+        self.dagbag = models.DagBag(include_examples=True)
+        self.dag1 = self.dagbag.dags['test_example_bash_operator']
+        self.dag2 = self.dagbag.dags['example_subdag_operator']
+
+        self.execution_dates = [days_ago(3), days_ago(2), days_ago(1)]
+
+        self.session = Session()
+
+    def verify_dag_run_states(self, dag, date, state=State.SUCCESS):
+        drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
+        dr = drs[0]
+        self.assertEqual(dr.get_state(), state)
+        tis = dr.get_task_instances(session=self.session)
+        for ti in tis:
+            self.assertEqual(ti.state, state)
+
+    def test_set_running_dag_run_state(self):
+        date = self.execution_dates[0]
+        dr = self.dag1.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.RUNNING,
+            execution_date=date,
+            session=self.session
+        )
+        for ti in dr.get_task_instances(session=self.session):
+            ti.set_state(State.RUNNING, self.session)
+
+        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True)
+
+        # All of the task should be altered
+        self.assertEqual(len(altered), len(self.dag1.tasks))
+        self.verify_dag_run_states(self.dag1, date)
+
+    def test_set_success_dag_run_state(self):
+        date = self.execution_dates[0]
+
+        dr = self.dag1.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.SUCCESS,
+            execution_date=date,
+            session=self.session
+        )
+        for ti in dr.get_task_instances(session=self.session):
+            ti.set_state(State.SUCCESS, self.session)
+
+        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True)
+
+        # None of the task should be altered
+        self.assertEqual(len(altered), 0)
+        self.verify_dag_run_states(self.dag1, date)
+
+    def test_set_failed_dag_run_state(self):
+        date = self.execution_dates[0]
+        dr = self.dag1.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.FAILED,
+            execution_date=date,
+            session=self.session
+        )
+        dr.get_task_instance('runme_0').set_state(State.FAILED, self.session)
+
+        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True)
+
+        # All of the task should be altered
+        self.assertEqual(len(altered), len(self.dag1.tasks))
+        self.verify_dag_run_states(self.dag1, date)
+
+    def test_set_mixed_dag_run_state(self):
+        """
+        This test checks function set_dag_run_state with mixed task instance
+        state.
+        """
+        date = self.execution_dates[0]
+        dr = self.dag1.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.FAILED,
+            execution_date=date,
+            session=self.session
+        )
+        # success task
+        dr.get_task_instance('runme_0').set_state(State.SUCCESS, self.session)
+        # skipped task
+        dr.get_task_instance('runme_1').set_state(State.SKIPPED, self.session)
+        # retry task
+        dr.get_task_instance('runme_2').set_state(State.UP_FOR_RETRY, self.session)
+        # queued task
+        dr.get_task_instance('also_run_this').set_state(State.QUEUED, self.session)
+        # running task
+        dr.get_task_instance('run_after_loop').set_state(State.RUNNING, self.session)
+        # failed task
+        dr.get_task_instance('run_this_last').set_state(State.FAILED, self.session)
+
+        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True)
+
+        self.assertEqual(len(altered), len(self.dag1.tasks) - 1) # only 1 task succeeded
+        self.verify_dag_run_states(self.dag1, date)
+
+    def test_set_state_without_commit(self):
+        date = self.execution_dates[0]
+
+        # Running dag run and task instances
+        dr = self.dag1.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.RUNNING,
+            execution_date=date,
+            session=self.session
+        )
+        for ti in dr.get_task_instances(session=self.session):
+            ti.set_state(State.RUNNING, self.session)
+
+        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=False)
+
+        # All of the task should be altered
+        self.assertEqual(len(altered), len(self.dag1.tasks))
+
+        # Both dag run and task instances' states should remain the same
+        self.verify_dag_run_states(self.dag1, date, State.RUNNING)
+
+    def test_set_state_with_multiple_dagruns(self):
+        dr1 = self.dag2.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.FAILED,
+            execution_date=self.execution_dates[0],
+            session=self.session
+        )
+        dr2 = self.dag2.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.FAILED,
+            execution_date=self.execution_dates[1],
+            session=self.session
+        )
+        dr3 = self.dag2.create_dagrun(
+            run_id='manual__' + datetime.now().isoformat(),
+            state=State.RUNNING,
+            execution_date=self.execution_dates[2],
+            session=self.session
+        )
+
+        altered = set_dag_run_state(self.dag2, self.execution_dates[1],
+                                state=State.SUCCESS, commit=True)
+
+        # Recursively count number of tasks in the dag
+        def count_dag_tasks(dag):
+            count = len(dag.tasks)
+            subdag_counts = [count_dag_tasks(subdag) for subdag in dag.subdags]
+            count += sum(subdag_counts)
+            return count
+
+        self.assertEqual(len(altered), count_dag_tasks(self.dag2))
+        self.verify_dag_run_states(self.dag2, self.execution_dates[1])
+
+        # Make sure other dag status are not changed
+        dr1 = models.DagRun.find(dag_id=self.dag2.dag_id, execution_date=self.execution_dates[0])
+        dr1 = dr1[0]
+        self.assertEqual(dr1.get_state(), State.FAILED)
+        dr3 = models.DagRun.find(dag_id=self.dag2.dag_id, execution_date=self.execution_dates[2])
+        dr3 = dr3[0]
+        self.assertEqual(dr3.get_state(), State.RUNNING)
+
+    def test_set_dag_run_state_edge_cases(self):
+        # Dag does not exist
+        altered = set_dag_run_state(None, self.execution_dates[0])
+        self.assertEqual(len(altered), 0)
+
+        # Invalid execution date
+        altered = set_dag_run_state(self.dag1, None)
+        self.assertEqual(len(altered), 0)
+        self.assertRaises(AssertionError, set_dag_run_state, self.dag1, timedelta(microseconds=-1))
+
+        # DagRun does not exist
+        # This will throw AssertionError since dag.latest_execution_date does not exist
+        self.assertRaises(AssertionError, set_dag_run_state, self.dag1, self.execution_dates[0])
+
+    def tearDown(self):
+        self.dag1.clear()
+        self.dag2.clear()
+
+        self.session.query(models.DagRun).delete()
+        self.session.query(models.TaskInstance).delete()
+        self.session.query(models.DagStat).delete()
+        self.session.commit()
+
 if __name__ == '__main__':
     unittest.main()