You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/26 12:47:51 UTC

[GitHub] [airflow] ashb commented on a change in pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

ashb commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583614358



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all TIs from the

Review comment:
       Chances? Is this test going to fail some of the time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org