You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/07 18:12:31 UTC

[GitHub] [airflow] potiuk opened a new issue #15255: [Quaranitned] TestSchedulerJob.test_scheduler_keeps_scheduling_pool_full is flaky

potiuk opened a new issue #15255:
URL: https://github.com/apache/airflow/issues/15255


   
   For example here:
   
   https://github.com/apache/airflow/runs/2288380184?check_suite_focus=true#step:6:8759
   
   ```
   
    =================================== FAILURES ===================================
     __________ TestSchedulerJob.test_scheduler_keeps_scheduling_pool_full __________
     
     self = <tests.jobs.test_scheduler_job.TestSchedulerJob testMethod=test_scheduler_keeps_scheduling_pool_full>
     
         def test_scheduler_keeps_scheduling_pool_full(self):
             """
             Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
             """
             dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d1', start_date=DEFAULT_DATE)
             BashOperator(
                 task_id='test_scheduler_keeps_scheduling_pool_full_t1',
                 dag=dag_d1,
                 owner='airflow',
                 pool='test_scheduler_keeps_scheduling_pool_full_p1',
                 bash_command='echo hi',
             )
         
             dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d2', start_date=DEFAULT_DATE)
             BashOperator(
                 task_id='test_scheduler_keeps_scheduling_pool_full_t2',
                 dag=dag_d2,
                 owner='airflow',
                 pool='test_scheduler_keeps_scheduling_pool_full_p2',
                 bash_command='echo hi',
             )
             dagbag = DagBag(
                 dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
                 include_examples=False,
                 read_dags_from_db=True,
             )
             dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
             dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
             dagbag.sync_to_db()
         
             session = settings.Session()
             pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1', slots=1)
             pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2', slots=10)
             session.add(pool_p1)
             session.add(pool_p2)
             session.commit()
         
             dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
         
             scheduler = SchedulerJob(executor=self.null_exec)
             scheduler.processor_agent = mock.MagicMock()
         
             # Create 5 dagruns for each DAG.
             # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
             # TIs from the first dag first.
             date = DEFAULT_DATE
             for _ in range(5):
                 dr = dag_d1.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=date,
                     state=State.RUNNING,
                 )
                 scheduler._schedule_dag_run(dr, {}, session)
                 date = dag_d1.following_schedule(date)
         
             date = DEFAULT_DATE
             for _ in range(5):
                 dr = dag_d2.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=date,
                     state=State.RUNNING,
                 )
                 scheduler._schedule_dag_run(dr, {}, session)
                 date = dag_d2.following_schedule(date)
         
             scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
             task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
         
             # Make sure we get TIs from a non-full pool in the 2nd list
             assert len(task_instances_list2) > 0
     >       assert all(
                 task_instance.pool != 'test_scheduler_keeps_scheduling_pool_full_p1'
                 for task_instance in task_instances_list2
             )
     E       AssertionError: assert False
     E        +  where False = all(<generator object TestSchedulerJob.test_scheduler_keeps_scheduling_pool_full.<locals>.<genexpr> at 0x7fb6ecb90c10>)
    ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed issue #15255: [QUARANTINED] TestSchedulerJob.test_scheduler_keeps_scheduling_pool_full is flaky

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #15255:
URL: https://github.com/apache/airflow/issues/15255


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org