You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 18:59:38 UTC

[airflow] branch v1-10-test updated: Handle IntegrityError while creating TIs (#10136)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 6a729c5  Handle IntegrityError while creating TIs (#10136)
6a729c5 is described below

commit 6a729c506ad3807e0c2adb49bb014338b5872292
Author: Sumit Maheshwari <sm...@twitter.com>
AuthorDate: Fri Aug 7 18:25:10 2020 +0530

    Handle IntegrityError while creating TIs (#10136)
    
    While doing a trigger_dag from UI, DagRun gets created first and then WebServer starts creating TIs. Meanwhile, Scheduler also picks up the DagRun and starts creating the TIs, which results in IntegrityError as the Primary key constraint gets violated. This happens when a DAG has a good number of tasks.
    
    Also, changing the TIs array with a set for faster lookups for Dags with too many tasks.
    
    (cherry picked from commit 21021228759da8d3e98ca3f6d0922a6e9a0b5e68)
---
 airflow/models/dagrun.py    | 16 +++++++++++++---
 tests/models/test_dagrun.py | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index ec9ecc8..a908a5b 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -23,6 +23,7 @@ from sqlalchemy import (
     Column, Integer, String, Boolean, PickleType, Index, UniqueConstraint, func, DateTime, or_,
     and_
 )
+from sqlalchemy.exc import IntegrityError
 from sqlalchemy.ext.declarative import declared_attr
 from sqlalchemy.orm import synonym
 from sqlalchemy.orm.session import Session
@@ -362,10 +363,10 @@ class DagRun(Base, LoggingMixin):
         tis = self.get_task_instances(session=session)
 
         # check for removed or restored tasks
-        task_ids = []
+        task_ids = set()
         for ti in tis:
             task_instance_mutation_hook(ti)
-            task_ids.append(ti.task_id)
+            task_ids.add(ti.task_id)
             task = None
             try:
                 task = dag.get_task(ti.task_id)
@@ -401,7 +402,16 @@ class DagRun(Base, LoggingMixin):
                 task_instance_mutation_hook(ti)
                 session.add(ti)
 
-        session.commit()
+        try:
+            session.commit()
+        except IntegrityError as err:
+            self.log.info(str(err))
+            self.log.info(
+                'Hit IntegrityError while creating the TIs for %s - %s',
+                dag.dag_id, self.execution_date
+            )
+            self.log.info('Doing session rollback.')
+            session.rollback()
 
     @staticmethod
     def get_run(session, dag_id, execution_date):
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index c149c00..431cf71 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -564,6 +564,25 @@ class DagRunTest(unittest.TestCase):
         flaky_ti.refresh_from_db()
         self.assertEqual(State.NONE, flaky_ti.state)
 
+    def test_already_added_task_instances_can_be_ignored(self):
+        dag = DAG('triggered_dag', start_date=DEFAULT_DATE)
+        dag.add_task(DummyOperator(task_id='first_task', owner='test'))
+
+        dagrun = self.create_dag_run(dag)
+        first_ti = dagrun.get_task_instances()[0]
+        self.assertEqual('first_task', first_ti.task_id)
+        self.assertEqual(State.NONE, first_ti.state)
+
+        # Lets assume that the above TI was added into DB by webserver, but if scheduler
+        # is running the same method at the same time it would find 0 TIs for this dag
+        # and proceeds further to create TIs. Hence mocking DagRun.get_task_instances
+        # method to return an empty list of TIs.
+        with mock.patch.object(DagRun, 'get_task_instances') as mock_gtis:
+            mock_gtis.return_value = []
+            dagrun.verify_integrity()
+            first_ti.refresh_from_db()
+            self.assertEqual(State.NONE, first_ti.state)
+
     @parameterized.expand([(state,) for state in State.task_states])
     @mock.patch('airflow.models.dagrun.task_instance_mutation_hook')
     def test_task_instance_mutation_hook(self, state, mock_hook):