You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/21 02:36:34 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

uranusjr commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r772788453



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -645,6 +645,89 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
         session.rollback()
         session.close()
 
+    def test_max_active_task_in_a_dag_with_large_task_does_not_affect_other_dags(self, dag_maker, session):
+        dag_id_1 = 'first_dag'
+        dag_id_2 = 'second_dag'
+        with dag_maker(dag_id_1, max_active_tasks=1):
+            for i in range(10):
+                BashOperator(task_id=f"mytask{i}", bash_command='sleep 1d')
+        dr1 = dag_maker.create_dagrun(run_id=dag_id_1, state=DagRunState.RUNNING)
+        tis1 = dr1.get_task_instances()
+        for ti in tis1:
+            ti.state = TaskInstanceState.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        assert 1 == len(res)
+        assert session.query(TaskInstance).filter(TaskInstance.state == TaskInstanceState.QUEUED).count() == 1
+        self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        # assert last_scheduling_decision is updated
+        assert (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.SCHEDULED, TaskInstance.last_scheduling_decision.__ne__(None))

Review comment:
       I think `isnot()` works in all 1.x versions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org