You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/05/09 23:21:14 UTC

[2/3] incubator-airflow git commit: Add logic to lock DB and avoid race condition

Add logic to lock DB and avoid race condition

The scheduler can encounter a queued task twice before the
task actually starts to run -- this locks the task and avoids
that condition.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c1aa93f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c1aa93f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c1aa93f1

Branch: refs/heads/master
Commit: c1aa93f1a7c9dbe88889b78b541b3abe05ded081
Parents: 43bdd7a
Author: jlowin <jl...@users.noreply.github.com>
Authored: Fri May 6 15:05:33 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Mon May 9 17:19:02 2016 -0400

----------------------------------------------------------------------
 airflow/models.py | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c1aa93f1/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index a1b17ac..7549325 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -817,16 +817,25 @@ class TaskInstance(Base):
         session.commit()
 
     @provide_session
-    def refresh_from_db(self, session=None):
+    def refresh_from_db(self, session=None, lock_for_update=False):
         """
         Refreshes the task instance from the database based on the primary key
+
+        :param lock_for_update: if True, indicates that the database should
+        lock the TaskInstance (issuing a FOR UPDATE clause) until the session
+        is committed.
         """
         TI = TaskInstance
-        ti = session.query(TI).filter(
+
+        qry = session.query(TI).filter(
             TI.dag_id == self.dag_id,
             TI.task_id == self.task_id,
-            TI.execution_date == self.execution_date,
-        ).first()
+            TI.execution_date == self.execution_date)
+
+        if lock_for_update:
+            ti = qry.with_for_update().first()
+        else:
+            ti = qry.first()
         if ti:
             self.state = ti.state
             self.start_date = ti.start_date
@@ -1159,7 +1168,7 @@ class TaskInstance(Base):
         self.pool = pool or task.pool
         self.test_mode = test_mode
         self.force = force
-        self.refresh_from_db()
+        self.refresh_from_db(session=session, lock_for_update=True)
         self.clear_xcom_data()
         self.job_id = job_id
         iso = datetime.now().isoformat()