You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/31 23:49:40 UTC

[GitHub] kaxil closed pull request #3798: [AIRFLOW-2951] Update dag_run table end_date when state change

kaxil closed pull request #3798: [AIRFLOW-2951] Update dag_run table end_date when state change
URL: https://github.com/apache/incubator-airflow/pull/3798
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py
index 88c5275f5a..2fac1254cd 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -208,6 +208,7 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
     dr.state = state
     if state == State.RUNNING:
         dr.start_date = timezone.utcnow()
+        dr.end_date = None
     else:
         dr.end_date = timezone.utcnow()
     session.commit()
diff --git a/airflow/models.py b/airflow/models.py
index 55badf4828..6c8031c18c 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4840,6 +4840,8 @@ def get_state(self):
     def set_state(self, state):
         if self._state != state:
             self._state = state
+            self.end_date = timezone.utcnow() if self._state in State.finished() else None
+
             if self.dag_id is not None:
                 # FIXME: Due to the scoped_session factor we we don't get a clean
                 # session here, so something really weird goes on:
@@ -5063,7 +5065,7 @@ def update_state(self, session=None):
             if (not unfinished_tasks and
                     any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
                 self.log.info('Marking run %s failed', self)
-                self.state = State.FAILED
+                self.set_state(State.FAILED)
                 dag.handle_callback(self, success=False, reason='task_failure',
                                     session=session)
 
@@ -5071,20 +5073,20 @@ def update_state(self, session=None):
             elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
                                               for r in roots):
                 self.log.info('Marking run %s successful', self)
-                self.state = State.SUCCESS
+                self.set_state(State.SUCCESS)
                 dag.handle_callback(self, success=True, reason='success', session=session)
 
             # if *all tasks* are deadlocked, the run failed
             elif (unfinished_tasks and none_depends_on_past and
                   none_task_concurrency and no_dependencies_met):
                 self.log.info('Deadlock; marking run %s failed', self)
-                self.state = State.FAILED
+                self.set_state(State.FAILED)
                 dag.handle_callback(self, success=False, reason='all_tasks_deadlocked',
                                     session=session)
 
             # finally, if the roots aren't done, the dag is still running
             else:
-                self.state = State.RUNNING
+                self.set_state(State.RUNNING)
 
         # todo: determine we want to use with_for_update to make sure to lock the run
         session.merge(self)
diff --git a/tests/models.py b/tests/models.py
index a1fd1e9912..7adeb3acdd 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -896,6 +896,124 @@ def on_failure_callable(context):
         updated_dag_state = dag_run.update_state()
         self.assertEqual(State.FAILED, updated_dag_state)
 
+    def test_dagrun_set_state_end_date(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_set_state_end_date',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        dag.clear()
+
+        now = timezone.utcnow()
+        dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date',
+                               state=State.RUNNING,
+                               execution_date=now,
+                               start_date=now)
+
+        # Initial end_date should be NULL
+        # State.SUCCESS and State.FAILED are all ending state and should set end_date
+        # State.RUNNING set end_date back to NULL
+        session.add(dr)
+        session.commit()
+        self.assertIsNone(dr.end_date)
+
+        dr.set_state(State.SUCCESS)
+        session.merge(dr)
+        session.commit()
+
+        dr_database = session.query(DagRun).filter(
+            DagRun.run_id == 'test_dagrun_set_state_end_date'
+        ).one()
+        self.assertIsNotNone(dr_database.end_date)
+        self.assertEqual(dr.end_date, dr_database.end_date)
+
+        dr.set_state(State.RUNNING)
+        session.merge(dr)
+        session.commit()
+
+        dr_database = session.query(DagRun).filter(
+            DagRun.run_id == 'test_dagrun_set_state_end_date'
+        ).one()
+
+        self.assertIsNone(dr_database.end_date)
+
+        dr.set_state(State.FAILED)
+        session.merge(dr)
+        session.commit()
+        dr_database = session.query(DagRun).filter(
+            DagRun.run_id == 'test_dagrun_set_state_end_date'
+        ).one()
+
+        self.assertIsNotNone(dr_database.end_date)
+        self.assertEqual(dr.end_date, dr_database.end_date)
+
+    def test_dagrun_update_state_end_date(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_update_state_end_date',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op1.set_upstream(op2)
+
+        dag.clear()
+
+        now = timezone.utcnow()
+        dr = dag.create_dagrun(run_id='test_dagrun_update_state_end_date',
+                               state=State.RUNNING,
+                               execution_date=now,
+                               start_date=now)
+
+        # Initial end_date should be NULL
+        # State.SUCCESS and State.FAILED are all ending state and should set end_date
+        # State.RUNNING set end_date back to NULL
+        session.merge(dr)
+        session.commit()
+        self.assertIsNone(dr.end_date)
+
+        ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+        ti_op1.set_state(state=State.SUCCESS, session=session)
+        ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+        ti_op2.set_state(state=State.SUCCESS, session=session)
+
+        dr.update_state()
+
+        dr_database = session.query(DagRun).filter(
+            DagRun.run_id == 'test_dagrun_update_state_end_date'
+        ).one()
+        self.assertIsNotNone(dr_database.end_date)
+        self.assertEqual(dr.end_date, dr_database.end_date)
+
+        ti_op1.set_state(state=State.RUNNING, session=session)
+        ti_op2.set_state(state=State.RUNNING, session=session)
+        dr.update_state()
+
+        dr_database = session.query(DagRun).filter(
+            DagRun.run_id == 'test_dagrun_update_state_end_date'
+        ).one()
+
+        self.assertEqual(dr._state, State.RUNNING)
+        self.assertIsNone(dr.end_date)
+        self.assertIsNone(dr_database.end_date)
+
+        ti_op1.set_state(state=State.FAILED, session=session)
+        ti_op2.set_state(state=State.FAILED, session=session)
+        dr.update_state()
+
+        dr_database = session.query(DagRun).filter(
+            DagRun.run_id == 'test_dagrun_update_state_end_date'
+        ).one()
+
+        self.assertIsNotNone(dr_database.end_date)
+        self.assertEqual(dr.end_date, dr_database.end_date)
+
     def test_get_task_instance_on_empty_dagrun(self):
         """
         Make sure that a proper value is returned when a dagrun has no task instances


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services