You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/06/07 07:16:58 UTC
incubator-airflow git commit: [AIRFLOW-1166] Speed up
_change_state_for_tis_without_dagrun
Repository: incubator-airflow
Updated Branches:
refs/heads/master 21c142051 -> 4764646b1
[AIRFLOW-1166] Speed up _change_state_for_tis_without_dagrun
_change_state_for_tis_without_dagrun was locking a
significant
amount of tasks uncessarily. This could end up in
a deadlock
in the database due to the time the lock stood.
Closes #2267 from bolkedebruin/fix_deadlock
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4764646b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4764646b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4764646b
Branch: refs/heads/master
Commit: 4764646b18f56c34a35c19bd20a1931eb3a844fe
Parents: 21c1420
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jun 7 09:16:51 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 7 09:16:51 2017 +0200
----------------------------------------------------------------------
airflow/jobs.py | 76 +++++++++++++++++++++++++---------------------------
tests/jobs.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 108 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4764646b/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index adc4328..508564a 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -35,7 +35,8 @@ import time
from time import sleep
import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
+from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_
+from sqlalchemy import update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
@@ -931,45 +932,42 @@ class SchedulerJob(BaseJob):
simple_dag_bag and with states in the old_state will be examined
:type simple_dag_bag: SimpleDagBag
"""
+ tis_changed = 0
+ if self.using_sqlite:
+ tis_to_change = (
+ session
+ .query(models.TaskInstance)
+ .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
+ .filter(models.TaskInstance.state.in_(old_states))
+ .filter(and_(
+ models.DagRun.dag_id == models.TaskInstance.dag_id,
+ models.DagRun.execution_date == models.TaskInstance.execution_date,
+ models.DagRun.state != State.RUNNING))
+ .with_for_update()
+ .all()
+ )
+ for ti in tis_to_change:
+ ti.set_state(new_state, session=session)
+ tis_changed += 1
+ else:
+ tis_changed = (
+ session
+ .query(models.TaskInstance)
+ .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
+ .filter(models.TaskInstance.state.in_(old_states))
+ .filter(and_(
+ models.DagRun.dag_id == models.TaskInstance.dag_id,
+ models.DagRun.execution_date == models.TaskInstance.execution_date,
+ models.DagRun.state != State.RUNNING))
+ .update({models.TaskInstance.state: new_state},
+ synchronize_session=False)
+ )
+ session.commit()
- task_instances_to_change = (
- session
- .query(models.TaskInstance)
- .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
- .filter(models.TaskInstance.state.in_(old_states))
- .with_for_update()
- .all()
- )
- """:type: list[TaskInstance]"""
-
- for task_instance in task_instances_to_change:
- dag_runs = DagRun.find(dag_id=task_instance.dag_id,
- execution_date=task_instance.execution_date,
- )
-
- if len(dag_runs) == 0:
- self.logger.warning("DagRun for %s %s does not exist",
- task_instance.dag_id,
- task_instance.execution_date)
- continue
-
- # There should only be one DAG run. Add some logging info if this
- # is not the case for later debugging.
- if len(dag_runs) > 1:
- self.logger.warning("Multiple DagRuns found for {} {}: {}"
- .format(task_instance.dag_id,
- task_instance.execution_date,
- dag_runs))
-
- if not any(dag_run.state == State.RUNNING for dag_run in dag_runs):
- self.logger.warning("Setting {} to state={} as it does not have "
- "a DagRun in the {} state"
- .format(task_instance,
- new_state,
- State.RUNNING))
- task_instance.state = new_state
- session.merge(task_instance)
- session.commit()
+ if tis_changed > 0:
+ self.logger.warning("Set {} task instances to state={} as their associated "
+ "DagRun was not in RUNNING state".format(
+ tis_changed, new_state))
@provide_session
def _execute_task_instances(self,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4764646b/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index b0763b9..fb6fcdd 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -613,6 +613,77 @@ class SchedulerJobTest(unittest.TestCase):
session.close()
+ def test_change_state_for_tis_without_dagrun(self):
+ dag = DAG(
+ dag_id='test_change_state_for_tis_without_dagrun',
+ start_date=DEFAULT_DATE)
+
+ DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ dag2 = DAG(
+ dag_id='test_change_state_for_tis_without_dagrun_dont_change',
+ start_date=DEFAULT_DATE)
+
+ DummyOperator(
+ task_id='dummy',
+ dag=dag2,
+ owner='airflow')
+
+ session = settings.Session()
+ dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session)
+
+ dr2 = dag2.create_dagrun(run_id=DagRun.ID_PREFIX,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session)
+
+ ti = dr.get_task_instance(task_id='dummy', session=session)
+ ti.state = State.SCHEDULED
+ session.commit()
+
+ ti2 = dr2.get_task_instance(task_id='dummy', session=session)
+ ti2.state = State.SCHEDULED
+ session.commit()
+
+ dagbag = SimpleDagBag([dag])
+ scheduler = SchedulerJob(num_runs=0, run_duration=0)
+ scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
+ old_states=[State.SCHEDULED, State.QUEUED],
+ new_state=State.NONE,
+ session=session)
+
+ ti.refresh_from_db(session=session)
+ self.assertEqual(ti.state, State.SCHEDULED)
+
+ ti2.refresh_from_db(session=session)
+ self.assertEqual(ti2.state, State.SCHEDULED)
+
+ dr.refresh_from_db(session=session)
+ dr.state = State.FAILED
+
+ # why o why
+ session.merge(dr)
+ session.commit()
+
+ scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
+ old_states=[State.SCHEDULED, State.QUEUED],
+ new_state=State.NONE,
+ session=session)
+ ti.refresh_from_db(session=session)
+ self.assertEqual(ti.state, State.NONE)
+
+ # don't touch ti2
+ ti2.refresh_from_db(session=session)
+ self.assertEqual(ti2.state, State.SCHEDULED)
+
def test_execute_helper_reset_orphaned_tasks(self):
session = settings.Session()
dag = DAG(