You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/09 14:22:52 UTC

[airflow] branch master updated: Set start_date, end_date & duration for tasks failing without DagRun (#11358)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new ff1a2aa  Set start_date, end_date & duration for tasks failing without DagRun (#11358)
ff1a2aa is described below

commit ff1a2aaff8f1cf508106648b56fc5b28b7ce4fa8
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Oct 9 15:21:39 2020 +0100

    Set start_date, end_date & duration for tasks failing without DagRun (#11358)
---
 airflow/jobs/scheduler_job.py    | 15 ++++++++++++++-
 airflow/models/taskinstance.py   |  7 +++++--
 tests/jobs/test_scheduler_job.py |  8 ++++++++
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2a81f0f..dd31f36 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1162,6 +1162,19 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,
+                })
+
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
@@ -1169,7 +1182,7 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+                .update(ti_prop_update, synchronize_session=False)
             session.commit()
 
         if tis_changed > 0:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6254d9e..335b4dd 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -611,10 +611,13 @@ class TaskInstance(Base, LoggingMixin):     # pylint: disable=R0902,R0904
         :param session: SQLAlchemy ORM Session
         :type session: Session
         """
+        current_time = timezone.utcnow()
         self.log.debug("Setting task state for %s to %s", self, state)
         self.state = state
-        self.start_date = timezone.utcnow()
-        self.end_date = timezone.utcnow()
+        self.start_date = current_time
+        if self.state in State.finished():
+            self.end_date = current_time
+            self.duration = 0
         session.merge(self)
 
     @property
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 3b713b4..cfbdbab 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2235,6 +2235,9 @@ class TestSchedulerJob(unittest.TestCase):
 
         ti3.refresh_from_db(session=session)
         self.assertEqual(ti3.state, State.NONE)
+        self.assertIsNotNone(ti3.start_date)
+        self.assertIsNone(ti3.end_date)
+        self.assertIsNone(ti3.duration)
 
         dr1.refresh_from_db(session=session)
         dr1.state = State.FAILED
@@ -2396,6 +2399,11 @@ class TestSchedulerJob(unittest.TestCase):
 
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
         self.assertEqual(ti.state, expected_task_state)
+        self.assertIsNotNone(ti.start_date)
+        if expected_task_state in State.finished():
+            self.assertIsNotNone(ti.end_date)
+            self.assertEqual(ti.start_date, ti.end_date)
+            self.assertIsNotNone(ti.duration)
 
     @provide_session
     def evaluate_dagrun(