You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/05/09 23:21:14 UTC
[2/3] incubator-airflow git commit: Add logic to lock DB and avoid
race condition
Add logic to lock DB and avoid race condition
The scheduler can encounter a queued task twice before the
task actually starts to run -- this locks the task and avoids
that condition.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c1aa93f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c1aa93f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c1aa93f1
Branch: refs/heads/master
Commit: c1aa93f1a7c9dbe88889b78b541b3abe05ded081
Parents: 43bdd7a
Author: jlowin <jl...@users.noreply.github.com>
Authored: Fri May 6 15:05:33 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Mon May 9 17:19:02 2016 -0400
----------------------------------------------------------------------
airflow/models.py | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c1aa93f1/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index a1b17ac..7549325 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -817,16 +817,25 @@ class TaskInstance(Base):
session.commit()
@provide_session
- def refresh_from_db(self, session=None):
+ def refresh_from_db(self, session=None, lock_for_update=False):
"""
Refreshes the task instance from the database based on the primary key
+
+ :param lock_for_update: if True, indicates that the database should
+ lock the TaskInstance (issuing a FOR UPDATE clause) until the session
+ is committed.
"""
TI = TaskInstance
- ti = session.query(TI).filter(
+
+ qry = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == self.task_id,
- TI.execution_date == self.execution_date,
- ).first()
+ TI.execution_date == self.execution_date)
+
+ if lock_for_update:
+ ti = qry.with_for_update().first()
+ else:
+ ti = qry.first()
if ti:
self.state = ti.state
self.start_date = ti.start_date
@@ -1159,7 +1168,7 @@ class TaskInstance(Base):
self.pool = pool or task.pool
self.test_mode = test_mode
self.force = force
- self.refresh_from_db()
+ self.refresh_from_db(session=session, lock_for_update=True)
self.clear_xcom_data()
self.job_id = job_id
iso = datetime.now().isoformat()