You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/11/07 19:23:18 UTC

incubator-airflow git commit: [AIRFLOW-1787] Fix task instance batch clear and set state bugs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1a7b63eb1 -> 313f5bac4


[AIRFLOW-1787] Fix task instance batch clear and set state bugs

Fixes Batch clear in Task Instances view is not working
for task instances in RUNNING state and all batch
operations in Task instances view cannot work when
manually triggered task instances are selected
because they have a different execution date
format.

Closes #2759 from yrqls21/fix-ti-batch-clear-n
-set-state-bugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/313f5bac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/313f5bac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/313f5bac

Branch: refs/heads/master
Commit: 313f5bac4a3f804094bcd583e0e5fbc3b5f405bb
Parents: 1a7b63e
Author: Kevin Yang <ke...@airbnb.com>
Authored: Tue Nov 7 11:22:58 2017 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Tue Nov 7 11:23:00 2017 -0800

----------------------------------------------------------------------
 airflow/utils/dates.py    | 14 ++++++++++++++
 airflow/www/views.py      | 30 ++++++++++++++++++++----------
 tests/utils/test_dates.py |  9 +++++++++
 3 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/313f5bac/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 43b87f4..81e1c2c 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -225,3 +225,17 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
         second=second,
         microsecond=microsecond)
     return today - timedelta(days=n)
+
+
+def parse_execution_date(execution_date_str):
+    """
+    Parse execution date string to datetime object.
+    """
+    try:
+        # Execution date follows execution date format of scheduled executions,
+        # e.g. '2017-11-02 00:00:00'
+        return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S')
+    except ValueError:
+        # Execution date follows execution date format of manually triggered executions,
+        # e.g. '2017-11-05 16:18:30..989729'
+        return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S..%f')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/313f5bac/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 81c44b6..a6d788e 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -76,7 +76,7 @@ from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.utils.db import create_session, provide_session
 from airflow.utils.helpers import alchemy_to_dict
-from airflow.utils.dates import infer_time_unit, scale_time_units
+from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date
 from airflow.www import utils as wwwutils
 from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
 from airflow.www.validators import GreaterEqualThan
@@ -2502,23 +2502,32 @@ class TaskInstanceModelView(ModelViewOnly):
         try:
             TI = models.TaskInstance
 
+            dag_to_task_details = {}
             dag_to_tis = {}
 
-            for id in ids:
-                task_id, dag_id, execution_date = id.split(',')
+            # Collect dags upfront as dagbag.get_dag() will reset the session
+            for id_str in ids:
+                task_id, dag_id, execution_date = id_str.split(',')
+                dag = dagbag.get_dag(dag_id)
+                task_details = dag_to_task_details.setdefault(dag, [])
+                task_details.append((task_id, execution_date))
 
-                ti = session.query(TI).filter(TI.task_id == task_id,
-                                              TI.dag_id == dag_id,
-                                              TI.execution_date == execution_date).one()
+            for dag, task_details in dag_to_task_details.items():
+                for task_id, execution_date in task_details:
+                    execution_date = parse_execution_date(execution_date)
 
-                dag = dagbag.get_dag(dag_id)
-                tis = dag_to_tis.setdefault(dag, [])
-                tis.append(ti)
+                    ti = session.query(TI).filter(TI.task_id == task_id,
+                                                  TI.dag_id == dag.dag_id,
+                                                  TI.execution_date == execution_date).one()
+
+                    tis = dag_to_tis.setdefault(dag, [])
+                    tis.append(ti)
 
             for dag, tis in dag_to_tis.items():
                 models.clear_task_instances(tis, session, dag=dag)
 
             session.commit()
+
             flash("{0} task instances have been cleared".format(len(ids)))
 
         except Exception as ex:
@@ -2533,7 +2542,8 @@ class TaskInstanceModelView(ModelViewOnly):
             count = len(ids)
             for id in ids:
                 task_id, dag_id, execution_date = id.split(',')
-                execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
+                execution_date = parse_execution_date(execution_date)
+
                 ti = session.query(TI).filter(TI.task_id == task_id,
                                               TI.dag_id == dag_id,
                                               TI.execution_date == execution_date).one()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/313f5bac/tests/utils/test_dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py
index 1323034..50e76ba 100644
--- a/tests/utils/test_dates.py
+++ b/tests/utils/test_dates.py
@@ -40,3 +40,12 @@ class Dates(unittest.TestCase):
         self.assertTrue(
             dates.days_ago(0, microsecond=3)
             == today_midnight + timedelta(microseconds=3))
+
+    def test_parse_execution_date(self):
+        execution_date_str_wo_ms = '2017-11-02 00:00:00'
+        execution_date_str_w_ms = '2017-11-05 16:18:30..989729'
+        bad_execution_date_str = '2017-11-06T00:00:00Z'
+
+        self.assertEqual(datetime(2017, 11, 2, 0, 0, 0), dates.parse_execution_date(execution_date_str_wo_ms))
+        self.assertEqual(datetime(2017, 11, 5, 16, 18, 30, 989729), dates.parse_execution_date(execution_date_str_w_ms))
+        self.assertRaises(ValueError, dates.parse_execution_date, bad_execution_date_str)