You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/10 13:11:46 UTC

[airflow] branch v2-1-test updated: Improve dag/task concurrency check (#17786)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-1-test by this push:
     new 19f32fd  Improve dag/task concurrency check (#17786)
19f32fd is described below

commit 19f32fd662a9d60a1271c92c2bca28313a500697
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Aug 23 15:58:55 2021 +0100

    Improve dag/task concurrency check (#17786)
    
    Currently, tasks can be run even if the dagrun is queued. Task instances of queued dagruns
    should only be run when the dagrun is in running state. This PR makes sure tis of queued dagruns
    are not run thereby properly checking task concurrency.
    
    Also, we check max_active_runs when parsing dag which is no longer needed since dagruns
    are created in queued state and the scheduler controls when to change the queued dagruns
    to running considering the max_active_runs.
    This PR removes the checking of max_active_runs in the dag too.
    
    (cherry picked from commit ffb81eae610f738fd45c88cdb27d601c0edf24fa)
---
 airflow/jobs/scheduler_job.py    |  5 ++---
 airflow/models/dag.py            | 26 +-------------------------
 tests/jobs/test_scheduler_job.py | 31 +++++++++++++++++++++++++++++++
 tests/models/test_dag.py         | 14 +++++++-------
 4 files changed, 41 insertions(+), 35 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 18ec981..8a625b9 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -323,6 +323,7 @@ class SchedulerJob(BaseJob):
             session.query(TI)
             .outerjoin(TI.dag_run)
             .filter(or_(DR.run_id.is_(None), DR.run_type != DagRunType.BACKFILL_JOB))
+            .filter(or_(DR.state.is_(None), DR.state != DagRunState.QUEUED))
             .join(TI.dag_model)
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
@@ -1011,9 +1012,7 @@ class SchedulerJob(BaseJob):
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
-                dag_model.next_dagrun
-            )
+            dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2e66b40..74f985f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1867,18 +1867,6 @@ class DAG(LoggingMixin):
             .all()
         )
 
-        # Get number of active dagruns for all dags we are processing as a single query.
-        num_active_runs = dict(
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_(existing_dag_ids),
-                DagRun.state == State.RUNNING,
-                DagRun.external_trigger.is_(False),
-            )
-            .group_by(DagRun.dag_id)
-            .all()
-        )
-
         for orm_dag in sorted(orm_dags, key=lambda d: d.dag_id):
             dag = dag_by_ids[orm_dag.dag_id]
             if dag.is_subdag:
@@ -1901,7 +1889,6 @@ class DAG(LoggingMixin):
             orm_dag.calculate_dagrun_date_fields(
                 dag,
                 most_recent_dag_runs.get(dag.dag_id),
-                num_active_runs.get(dag.dag_id, 0),
             )
 
             for orm_tag in list(orm_dag.tags):
@@ -2282,27 +2269,16 @@ class DagModel(Base):
         return with_row_locks(query, of=cls, session=session, **skip_locked(session=session))
 
     def calculate_dagrun_date_fields(
-        self, dag: DAG, most_recent_dag_run: Optional[pendulum.DateTime], active_runs_of_dag: int
+        self, dag: DAG, most_recent_dag_run: Optional[pendulum.DateTime]
     ) -> None:
         """
         Calculate ``next_dagrun`` and `next_dagrun_create_after``
 
         :param dag: The DAG object
         :param most_recent_dag_run: DateTime of most recent run of this dag, or none if not yet scheduled.
-        :param active_runs_of_dag: Number of currently active runs of this dag
         """
         self.next_dagrun, self.next_dagrun_create_after = dag.next_dagrun_info(most_recent_dag_run)
 
-        if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
-            # Since this happens every time the dag is parsed it would be quite spammy at info
-            log.debug(
-                "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
-                dag.dag_id,
-                active_runs_of_dag,
-                dag.max_active_runs,
-            )
-            self.next_dagrun_create_after = None
-
         log.info("Setting next_dagrun for %s to %s", dag.dag_id, self.next_dagrun)
 
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5de365e..f02ecfa 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -799,6 +799,37 @@ class TestSchedulerJob(unittest.TestCase):
         assert 0 == len(self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session))
         session.rollback()
 
+    def test_tis_for_queued_dagruns_are_not_run(self, dag_maker):
+        """
+        This tests that tis from queued dagruns are not queued
+        """
+        dag_id = "test_tis_for_queued_dagruns_are_not_run"
+        task_id_1 = 'dummy'
+
+        with dag_maker(dag_id) as dag:
+            task1 = DummyOperator(task_id=task_id_1)
+        dr1 = dag_maker.create_dagrun(state=State.QUEUED)
+        dr2 = dag_maker.create_dagrun(
+            run_id='test2', execution_date=dag.following_schedule(dr1.execution_date)
+        )
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+        ti1 = TaskInstance(task1, dr1.execution_date)
+        ti2 = TaskInstance(task1, dr2.execution_date)
+        ti1.state = State.SCHEDULED
+        ti2.state = State.SCHEDULED
+        session.merge(ti1)
+        session.merge(ti2)
+        session.flush()
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+
+        assert 1 == len(res)
+        assert ti2.key == res[0].key
+        ti1.refresh_from_db()
+        ti2.refresh_from_db()
+        assert ti1.state == State.SCHEDULED
+        assert ti2.state == State.QUEUED
+
     def test_find_executable_task_instances_concurrency(self):
         dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency'
         task_id_1 = 'dummy'
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 34d761d..118dec4 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -674,7 +674,7 @@ class TestDag(unittest.TestCase):
         clear_db_dags()
         dags = [DAG(f'dag-bulk-sync-{i}', start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(0, 4)]
 
-        with assert_queries_count(5):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
@@ -691,14 +691,14 @@ class TestDag(unittest.TestCase):
                 assert row[0] is not None
 
         # Re-sync should do fewer queries
-        with assert_queries_count(4):
+        with assert_queries_count(3):
             DAG.bulk_write_to_db(dags)
-        with assert_queries_count(4):
+        with assert_queries_count(3):
             DAG.bulk_write_to_db(dags)
         # Adding tags
         for dag in dags:
             dag.tags.append("test-dag2")
-        with assert_queries_count(5):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
@@ -717,7 +717,7 @@ class TestDag(unittest.TestCase):
         # Removing tags
         for dag in dags:
             dag.tags.remove("test-dag")
-        with assert_queries_count(5):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
@@ -764,8 +764,8 @@ class TestDag(unittest.TestCase):
 
         model = session.query(DagModel).get((dag.dag_id,))
         assert model.next_dagrun == period_end
-        # We signal "at max active runs" by saying this run is never eligible to be created
-        assert model.next_dagrun_create_after is None
+        # Next dagrun after is not None because the dagrun would be in queued state
+        assert model.next_dagrun_create_after is not None
 
     def test_sync_to_db(self):
         dag = DAG(