You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/06/07 07:16:58 UTC

incubator-airflow git commit: [AIRFLOW-1166] Speed up _change_state_for_tis_without_dagrun

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 21c142051 -> 4764646b1


[AIRFLOW-1166] Speed up _change_state_for_tis_without_dagrun

_change_state_for_tis_without_dagrun was locking a
significant
amount of tasks uncessarily. This could end up in
a deadlock
in the database due to the time the lock stood.

Closes #2267 from bolkedebruin/fix_deadlock


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4764646b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4764646b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4764646b

Branch: refs/heads/master
Commit: 4764646b18f56c34a35c19bd20a1931eb3a844fe
Parents: 21c1420
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jun 7 09:16:51 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 7 09:16:51 2017 +0200

----------------------------------------------------------------------
 airflow/jobs.py | 76 +++++++++++++++++++++++++---------------------------
 tests/jobs.py   | 71 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 108 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4764646b/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index adc4328..508564a 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -35,7 +35,8 @@ import time
 from time import sleep
 
 import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
+from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_
+from sqlalchemy import update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
@@ -931,45 +932,42 @@ class SchedulerJob(BaseJob):
         simple_dag_bag and with states in the old_state will be examined
         :type simple_dag_bag: SimpleDagBag
         """
+        tis_changed = 0
+        if self.using_sqlite:
+            tis_to_change = (
+                session
+                    .query(models.TaskInstance)
+                    .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
+                    .filter(models.TaskInstance.state.in_(old_states))
+                    .filter(and_(
+                        models.DagRun.dag_id == models.TaskInstance.dag_id,
+                        models.DagRun.execution_date == models.TaskInstance.execution_date,
+                        models.DagRun.state != State.RUNNING))
+                    .with_for_update()
+                    .all()
+            )
+            for ti in tis_to_change:
+                ti.set_state(new_state, session=session)
+                tis_changed += 1
+        else:
+            tis_changed = (
+                session
+                    .query(models.TaskInstance)
+                    .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
+                    .filter(models.TaskInstance.state.in_(old_states))
+                    .filter(and_(
+                        models.DagRun.dag_id == models.TaskInstance.dag_id,
+                        models.DagRun.execution_date == models.TaskInstance.execution_date,
+                        models.DagRun.state != State.RUNNING))
+                    .update({models.TaskInstance.state: new_state},
+                            synchronize_session=False)
+            )
+            session.commit()
 
-        task_instances_to_change = (
-            session
-            .query(models.TaskInstance)
-            .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
-            .filter(models.TaskInstance.state.in_(old_states))
-            .with_for_update()
-            .all()
-        )
-        """:type: list[TaskInstance]"""
-
-        for task_instance in task_instances_to_change:
-            dag_runs = DagRun.find(dag_id=task_instance.dag_id,
-                                   execution_date=task_instance.execution_date,
-                                   )
-
-            if len(dag_runs) == 0:
-                self.logger.warning("DagRun for %s %s does not exist",
-                                    task_instance.dag_id,
-                                    task_instance.execution_date)
-                continue
-
-            # There should only be one DAG run. Add some logging info if this
-            # is not the case for later debugging.
-            if len(dag_runs) > 1:
-                self.logger.warning("Multiple DagRuns found for {} {}: {}"
-                                    .format(task_instance.dag_id,
-                                            task_instance.execution_date,
-                                            dag_runs))
-
-            if not any(dag_run.state == State.RUNNING for dag_run in dag_runs):
-                self.logger.warning("Setting {} to state={} as it does not have "
-                                    "a DagRun in the {} state"
-                                    .format(task_instance,
-                                            new_state,
-                                            State.RUNNING))
-                task_instance.state = new_state
-                session.merge(task_instance)
-        session.commit()
+        if tis_changed > 0:
+            self.logger.warning("Set {} task instances to state={} as their associated "
+                                "DagRun was not in RUNNING state".format(
+                tis_changed, new_state))
 
     @provide_session
     def _execute_task_instances(self,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4764646b/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index b0763b9..fb6fcdd 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -613,6 +613,77 @@ class SchedulerJobTest(unittest.TestCase):
 
         session.close()
 
+    def test_change_state_for_tis_without_dagrun(self):
+        dag = DAG(
+            dag_id='test_change_state_for_tis_without_dagrun',
+            start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        dag2 = DAG(
+            dag_id='test_change_state_for_tis_without_dagrun_dont_change',
+            start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag2,
+            owner='airflow')
+
+        session = settings.Session()
+        dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+                               state=State.RUNNING,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+
+        dr2 = dag2.create_dagrun(run_id=DagRun.ID_PREFIX,
+                                 state=State.RUNNING,
+                                 execution_date=DEFAULT_DATE,
+                                 start_date=DEFAULT_DATE,
+                                 session=session)
+
+        ti = dr.get_task_instance(task_id='dummy', session=session)
+        ti.state = State.SCHEDULED
+        session.commit()
+
+        ti2 = dr2.get_task_instance(task_id='dummy', session=session)
+        ti2.state = State.SCHEDULED
+        session.commit()
+
+        dagbag = SimpleDagBag([dag])
+        scheduler = SchedulerJob(num_runs=0, run_duration=0)
+        scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
+                                                       old_states=[State.SCHEDULED, State.QUEUED],
+                                                       new_state=State.NONE,
+                                                       session=session)
+
+        ti.refresh_from_db(session=session)
+        self.assertEqual(ti.state, State.SCHEDULED)
+
+        ti2.refresh_from_db(session=session)
+        self.assertEqual(ti2.state, State.SCHEDULED)
+
+        dr.refresh_from_db(session=session)
+        dr.state = State.FAILED
+
+        # why o why
+        session.merge(dr)
+        session.commit()
+
+        scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
+                                                       old_states=[State.SCHEDULED, State.QUEUED],
+                                                       new_state=State.NONE,
+                                                       session=session)
+        ti.refresh_from_db(session=session)
+        self.assertEqual(ti.state, State.NONE)
+
+        # don't touch ti2
+        ti2.refresh_from_db(session=session)
+        self.assertEqual(ti2.state, State.SCHEDULED)
+
     def test_execute_helper_reset_orphaned_tasks(self):
         session = settings.Session()
         dag = DAG(