You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/09 14:22:52 UTC
[airflow] branch master updated: Set start_date,
end_date & duration for tasks failing without DagRun (#11358)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new ff1a2aa Set start_date, end_date & duration for tasks failing without DagRun (#11358)
ff1a2aa is described below
commit ff1a2aaff8f1cf508106648b56fc5b28b7ce4fa8
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Oct 9 15:21:39 2020 +0100
Set start_date, end_date & duration for tasks failing without DagRun (#11358)
---
airflow/jobs/scheduler_job.py | 15 ++++++++++++++-
airflow/models/taskinstance.py | 7 +++++--
tests/jobs/test_scheduler_job.py | 8 ++++++++
3 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2a81f0f..dd31f36 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1162,6 +1162,19 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
tis_changed += 1
else:
subq = query.subquery()
+ current_time = timezone.utcnow()
+ ti_prop_update = {
+ models.TaskInstance.state: new_state,
+ models.TaskInstance.start_date: current_time,
+ }
+
+ # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+ if new_state in State.finished():
+ ti_prop_update.update({
+ models.TaskInstance.end_date: current_time,
+ models.TaskInstance.duration: 0,
+ })
+
tis_changed = session \
.query(models.TaskInstance) \
.filter(
@@ -1169,7 +1182,7 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
subq.c.execution_date) \
- .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+ .update(ti_prop_update, synchronize_session=False)
session.commit()
if tis_changed > 0:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6254d9e..335b4dd 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -611,10 +611,13 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
:param session: SQLAlchemy ORM Session
:type session: Session
"""
+ current_time = timezone.utcnow()
self.log.debug("Setting task state for %s to %s", self, state)
self.state = state
- self.start_date = timezone.utcnow()
- self.end_date = timezone.utcnow()
+ self.start_date = current_time
+ if self.state in State.finished():
+ self.end_date = current_time
+ self.duration = 0
session.merge(self)
@property
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 3b713b4..cfbdbab 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2235,6 +2235,9 @@ class TestSchedulerJob(unittest.TestCase):
ti3.refresh_from_db(session=session)
self.assertEqual(ti3.state, State.NONE)
+ self.assertIsNotNone(ti3.start_date)
+ self.assertIsNone(ti3.end_date)
+ self.assertIsNone(ti3.duration)
dr1.refresh_from_db(session=session)
dr1.state = State.FAILED
@@ -2396,6 +2399,11 @@ class TestSchedulerJob(unittest.TestCase):
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
self.assertEqual(ti.state, expected_task_state)
+ self.assertIsNotNone(ti.start_date)
+ if expected_task_state in State.finished():
+ self.assertIsNotNone(ti.end_date)
+ self.assertEqual(ti.start_date, ti.end_date)
+ self.assertIsNotNone(ti.duration)
@provide_session
def evaluate_dagrun(