You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/05 18:18:14 UTC

[GitHub] [airflow] ginevragaudioso opened a new pull request #15210: AIRFLOW-15171 order query to find out tasks to queue

ginevragaudioso opened a new pull request #15210:
URL: https://github.com/apache/airflow/pull/15210


   See issue  #15171.
   
   I tested the query on our airflow instance and it correctly sorts the results.
   
   closes: #15171
   related: #15171
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-814442942


   Please rebase on latest master, that should fix the failing error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730


   > The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
   
   The TIs created do not have the same execution date, unless I am missing something.
   ```
           dr1 = dag_1.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE + timedelta(hours=1),
                state=State.RUNNING,
            )
            dr2 = dag_2.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE,
                state=State.RUNNING,
            )
   
           tis = [
                TaskInstance(dag1_task, dr1.execution_date),   # THIS IS DEFAULT_DATE + timedelta(hours=1)  (later)
                TaskInstance(dag2_task, dr2.execution_date),  # THIS IS DEFAULT_DATE                                      (earlier)
            ]
   ```
   So the test is testing that we pick the one with the earliest execution date even if it is alphabetically later (which is exactly the bug being fixed).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso commented on a change in pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r612503846



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
+            .order_by(-TI.priority_weight, TI.execution_date)

Review comment:
       @kaxil ideas on what is failing? I see the test_scheduler_job (the one I added my test to) run, and I see dots after it, so I'd think it is passing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-828119707


   I think what Ash meant was the currently available tests either has the same execution date *or* the same priority, and need to be extended to test more combination of values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-860614313


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-828719342


   > The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
   
   @ashb  thanks for the feedback, I added two more tests, one for priority and one for both.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] matt-land commented on a change in pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
matt-land commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r612535908



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
+            .order_by(-TI.priority_weight, TI.execution_date)

Review comment:
       How do we determine if the build is failing from master vs the new test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730


   > The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
   
   The TIs created do not have the same execution date, unless I am missing something.
   ```
           dr1 = dag_1.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE + timedelta(hours=1),
                state=State.RUNNING,
            )
            dr2 = dag_2.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE,
                state=State.RUNNING,
            )
   ``


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r616766352



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,63 @@ def test_find_executable_task_instances_pool(self):
         assert tis[3].key in res_keys
         session.rollback()
 
+    def test_find_executable_task_instances_order(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert 1 == len(res)
+        res_keys = []
+        for ti in res:
+            res_keys.append(ti.key)

Review comment:
       ```suggestion
           res_keys = [ti.key for ti in res]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-814443400


   I just pushed, should work now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-828119707


   I think what Ash meant was the currently available tests either has the same execution date *or* the same priority, and need to be extended to cover more combination of values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso commented on pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-822530576


   @kaxil @ashb  would it be possible to have this fix go in 2.0.2? Seems like an easy fix that solves an actual issue, but I don't know what's in the roadmap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r609109247



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
+            .order_by(-TI.priority_weight, TI.execution_date)

Review comment:
       Can you add a test case for this in https://github.com/apache/airflow/blob/master/tests/jobs/test_scheduler_job.py please to avoid regression




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r640592820



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,165 @@ def test_find_executable_task_instances_pool(self):
         assert tis[3].key in res_keys
         session.rollback()
 
+    def test_find_executable_task_instances_order_execution_date(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert [ti.key for ti in res] == [tis[1].key]
+        session.rollback()
+
+    def test_find_executable_task_instances_order_priority(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)

Review comment:
       ```suggestion
           dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1)
           dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4)
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,165 @@ def test_find_executable_task_instances_pool(self):
         assert tis[3].key in res_keys
         session.rollback()
 
+    def test_find_executable_task_instances_order_execution_date(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert [ti.key for ti in res] == [tis[1].key]
+        session.rollback()
+
+    def test_find_executable_task_instances_order_priority(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert [ti.key for ti in res] == [tis[1].key]
+        session.rollback()
+
+    def test_find_executable_task_instances_order_execution_date_and_priority(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)

Review comment:
       ```suggestion
           dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1)
           dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r640520885



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,171 @@ def test_find_executable_task_instances_pool(self):
         assert tis[3].key in res_keys
         session.rollback()
 
+    def test_find_executable_task_instances_order_execution_date(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert 1 == len(res)
+        res_keys = [ti.key for ti in res]
+        assert tis[1].key in res_keys

Review comment:
       ```suggestion
           assert [ti.key for ti in res] == [tis[1].key]
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,171 @@ def test_find_executable_task_instances_pool(self):
         assert tis[3].key in res_keys
         session.rollback()
 
+    def test_find_executable_task_instances_order_execution_date(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert 1 == len(res)
+        res_keys = [ti.key for ti in res]
+        assert tis[1].key in res_keys
+        session.rollback()
+
+    def test_find_executable_task_instances_order_priority(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert 1 == len(res)
+        res_keys = [ti.key for ti in res]
+        assert tis[1].key in res_keys
+        session.rollback()
+
+    def test_find_executable_task_instances_order_execution_date_and_priority(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert 1 == len(res)
+        res_keys = [ti.key for ti in res]
+        assert tis[1].key in res_keys

Review comment:
       ```suggestion
           assert [ti.key for ti in res] == [tis[1].key]
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,171 @@ def test_find_executable_task_instances_pool(self):
         assert tis[3].key in res_keys
         session.rollback()
 
+    def test_find_executable_task_instances_order_execution_date(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert 1 == len(res)
+        res_keys = [ti.key for ti in res]
+        assert tis[1].key in res_keys
+        session.rollback()
+
+    def test_find_executable_task_instances_order_priority(self):
+        dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+        dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+        session.flush()
+        assert 1 == len(res)
+        res_keys = [ti.key for ti in res]
+        assert tis[1].key in res_keys

Review comment:
       ```suggestion
           assert [ti.key for ti in res] == [tis[1].key]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-860620521


   Well done @ginevragaudioso πŸ‘ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r616840885



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
+            .order_by(-TI.priority_weight, TI.execution_date)

Review comment:
       @matt-land Checking the status of CI on the first commit might help to determine that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813749166


   @ashb @kaxil  @XD-DENG 
   Could some of you help with the check failures? I'd love to get this PR ready, but I am having a hard time understanding how the failure is connected to my changes here. Much appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813554213


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso commented on a change in pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r609194927



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
+            .order_by(-TI.priority_weight, TI.execution_date)

Review comment:
       Done, although I am not sure if the build failure I now get is related to my test or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730


   > The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
   
   The TIs created do not have the same execution date, unless I am missing something.
   ```
           dr1 = dag_1.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE + timedelta(hours=1),
                state=State.RUNNING,
            )
            dr2 = dag_2.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE,
                state=State.RUNNING,
            )
   ```
   So the test is testing that we pick the one with the earliest execution date even if it is alphabetically later (which is exactly the bug being fixed).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-820808855


   cc @ashb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-823341979


   @ginevragaudioso Sorry, was too late (even yesterday) as the RC was already being voted upon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730


   > The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
   
   The TIs created do not have the same execution date, unless I am missing something.
   ```
           dr1 = dag_1.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE + timedelta(hours=1),
                state=State.RUNNING,
            )
            dr2 = dag_2.create_dagrun(
                run_type=DagRunType.SCHEDULED,
                execution_date=DEFAULT_DATE,
                state=State.RUNNING,
            )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-823431034


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813695332


   [The Workflow run](https://github.com/apache/airflow/actions/runs/720606699) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso commented on pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-815268784


   Thanks @kaxil for fixing the build. Anything else I should do here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-849271807


   @ashb bumping you to review the updated tests you requested here https://github.com/apache/airflow/pull/15210#pullrequestreview-645935287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #15210: Queue tasks with higher priority and earlier execution_date first.

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #15210:
URL: https://github.com/apache/airflow/pull/15210


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813695553






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ginevragaudioso removed a comment on pull request #15210: order query to find out tasks to queue

Posted by GitBox <gi...@apache.org>.
ginevragaudioso removed a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813749166


   @ashb @kaxil  @XD-DENG 
   Could some of you help with the check failures? I'd love to get this PR ready, but I am having a hard time understanding how the failure is connected to my changes here. Much appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org