You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/10/11 21:06:01 UTC

incubator-airflow git commit: [AIRFLOW-1681] Add batch clear in task instance view

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 98b4df945 -> 9f2c16a0a


[AIRFLOW-1681] Add batch clear in task instance view

Allow users to batch clear selected task
instance(s) in task instance view. Only state(s)
of selected task instance(s) will be cleared--no
upstream nor downstream task instance will be
affected.

DAG(s) involved will be set to "RUNNING" state,
same as existing "clear" operation.

Keeping both "Delete" and "Clear" operations for
more smooth user habit transition--informing DAG
state change in pop-up (check screenshots).

Closes #2681 from yrqls21/add-batch-clear-in-task-
instance-view


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f2c16a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f2c16a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f2c16a0

Branch: refs/heads/master
Commit: 9f2c16a0ac261888fe2ee4671538201c273f82d5
Parents: 98b4df9
Author: Kevin Yang <ke...@airbnb.com>
Authored: Wed Oct 11 14:05:33 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Wed Oct 11 14:05:35 2017 -0700

----------------------------------------------------------------------
 airflow/www/views.py | 62 +++++++++++++++++++++++------------------------
 1 file changed, 31 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f2c16a0/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bc63b5b..81ee61f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -37,7 +37,7 @@ import sqlalchemy as sqla
 from sqlalchemy import or_, desc, and_, union_all
 
 from flask import (
-    abort, redirect, url_for, request, Markup, Response, current_app, render_template, 
+    abort, redirect, url_for, request, Markup, Response, current_app, render_template,
     make_response)
 from flask_admin import BaseView, expose, AdminIndexView
 from flask_admin.contrib.sqla import ModelView
@@ -2488,7 +2488,6 @@ class TaskInstanceModelView(ModelViewOnly):
         'start_date', 'end_date', 'duration', 'job_id', 'hostname',
         'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
         'pool', 'log_url')
-    can_delete = True
     page_size = PAGE_SIZE
 
     @action('set_running', "Set state to 'running'", None)
@@ -2507,58 +2506,59 @@ class TaskInstanceModelView(ModelViewOnly):
     def action_set_retry(self, ids):
         self.set_task_instance_state(ids, State.UP_FOR_RETRY)
 
-    @action('delete',
-            lazy_gettext('Delete'),
-            lazy_gettext('Are you sure you want to delete selected records?'))
-    def action_delete(self, ids):
-        """
-        As a workaround for AIRFLOW-277, this method overrides Flask-Admin's ModelView.action_delete().
-
-        TODO: this method should be removed once the below bug is fixed on Flask-Admin side.
-        https://github.com/flask-admin/flask-admin/issues/1226
-        """
-        if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
-            self.delete_task_instances(ids)
-        else:
-            super(TaskInstanceModelView, self).action_delete(ids)
-
     @provide_session
-    def set_task_instance_state(self, ids, target_state, session=None):
+    @action('clear',
+            lazy_gettext('Clear'),
+            lazy_gettext(
+                'Are you sure you want to clear the state of the selected task instance(s)'
+                ' and set their dagruns to the running state?'))
+    def action_clear(self, ids, session=None):
         try:
             TI = models.TaskInstance
-            count = len(ids)
+
+            dag_to_tis = {}
+
             for id in ids:
                 task_id, dag_id, execution_date = id.split(',')
-                execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
+
                 ti = session.query(TI).filter(TI.task_id == task_id,
                                               TI.dag_id == dag_id,
                                               TI.execution_date == execution_date).one()
-                ti.state = target_state
+
+                dag = dagbag.get_dag(dag_id)
+                tis = dag_to_tis.setdefault(dag, [])
+                tis.append(ti)
+
+            for dag, tis in dag_to_tis.items():
+                models.clear_task_instances(tis, session, dag=dag)
+
             session.commit()
-            flash(
-                "{count} task instances were set to '{target_state}'".format(**locals()))
+            flash("{0} task instances have been cleared".format(len(ids)))
+
         except Exception as ex:
             if not self.handle_view_exception(ex):
                 raise Exception("Ooops")
-            flash('Failed to set state', 'error')
+            flash('Failed to clear task instances', 'error')
 
     @provide_session
-    def delete_task_instances(self, ids, session=None):
+    def set_task_instance_state(self, ids, target_state, session=None):
         try:
             TI = models.TaskInstance
-            count = 0
+            count = len(ids)
             for id in ids:
                 task_id, dag_id, execution_date = id.split(',')
                 execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
-                count += session.query(TI).filter(TI.task_id == task_id,
-                                                  TI.dag_id == dag_id,
-                                                  TI.execution_date == execution_date).delete()
+                ti = session.query(TI).filter(TI.task_id == task_id,
+                                              TI.dag_id == dag_id,
+                                              TI.execution_date == execution_date).one()
+                ti.state = target_state
             session.commit()
-            flash("{count} task instances were deleted".format(**locals()))
+            flash(
+                "{count} task instances were set to '{target_state}'".format(**locals()))
         except Exception as ex:
             if not self.handle_view_exception(ex):
                 raise Exception("Ooops")
-            flash('Failed to delete', 'error')
+            flash('Failed to set state', 'error')
 
     def get_one(self, id):
         """