You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/10 08:15:07 UTC

[5/9] incubator-airflow git commit: [AIRFLOW-738] Commit deleted xcom items before insert

[AIRFLOW-738] Commit deleted xcom items before insert

A delete insert sequence within one transaction can lead
to a deadlocked transaction with Mariadb / MySQL.

The deletes, in case they affected no rows, all get a shared lock
(mode IX) on the end-of-table gap. Once the insert is executed,
the shared lock is still held by all threads,
and the insert intention waits for the release of this shared lock.

The solution is to not do the following in parallel:

1. Delete the rows you want to insert, when the rows aren't there.
2. Insert the rows

In this case the risk of not executing the delete and insert
is relatively low, as it was the users intention to run the
task. In case it fails in between the two transactions
the task can be tried.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e18d67de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e18d67de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e18d67de

Branch: refs/heads/v1-8-test
Commit: e18d67dec4774946a35f7c34953bdfd7138595bf
Parents: 19ed900
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Jan 9 22:04:35 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jan 9 22:10:42 2017 +0100

----------------------------------------------------------------------
 airflow/models.py | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e18d67de/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f6f7968..1a0919a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3515,6 +3515,8 @@ class XCom(Base):
             cls.task_id == task_id,
             cls.dag_id == dag_id).delete()
 
+        session.commit()
+
         # insert new XCom
         session.add(XCom(
             key=key,