You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/05 18:18:14 UTC
[GitHub] [airflow] ginevragaudioso opened a new pull request #15210: AIRFLOW-15171 order query to find out tasks to queue
ginevragaudioso opened a new pull request #15210:
URL: https://github.com/apache/airflow/pull/15210
See issue #15171.
I tested the query on our airflow instance and it correctly sorts the results.
closes: #15171
related: #15171
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-814442942
Please rebase on latest master, that should fix the failing error
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730
> The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
The TIs created do not have the same execution date, unless I am missing something.
```
dr1 = dag_1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
)
dr2 = dag_2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
tis = [
TaskInstance(dag1_task, dr1.execution_date), # THIS IS DEFAULT_DATE + timedelta(hours=1) (later)
TaskInstance(dag2_task, dr2.execution_date), # THIS IS DEFAULT_DATE (earlier)
]
```
So the test is testing that we pick the one with the earliest execution date even if it is alphabetically later (which is exactly the bug being fixed).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso commented on a change in pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r612503846
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
+ .order_by(-TI.priority_weight, TI.execution_date)
Review comment:
@kaxil ideas on what is failing? I see the test_scheduler_job (the one I added my test to) run, and I see dots after it, so I'd think it is passing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-828119707
I think what Ash meant was the currently available tests either has the same execution date *or* the same priority, and need to be extended to test more combination of values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-860614313
Awesome work, congrats on your first merged pull request!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-828719342
> The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
@ashb thanks for the feedback, I added two more tests, one for priority and one for both.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] matt-land commented on a change in pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
matt-land commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r612535908
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
+ .order_by(-TI.priority_weight, TI.execution_date)
Review comment:
How do we determine if the build is failing from master vs the new test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730
> The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
The TIs created do not have the same execution date, unless I am missing something.
```
dr1 = dag_1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
)
dr2 = dag_2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
``
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r616766352
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,63 @@ def test_find_executable_task_instances_pool(self):
assert tis[3].key in res_keys
session.rollback()
+ def test_find_executable_task_instances_order(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE + timedelta(hours=1),
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert 1 == len(res)
+ res_keys = []
+ for ti in res:
+ res_keys.append(ti.key)
Review comment:
```suggestion
res_keys = [ti.key for ti in res]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-814443400
I just pushed, should work now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-828119707
I think what Ash meant was the currently available tests either has the same execution date *or* the same priority, and need to be extended to cover more combination of values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso commented on pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-822530576
@kaxil @ashb would it be possible to have this fix go in 2.0.2? Seems like an easy fix that solves an actual issue, but I don't know what's in the roadmap.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r609109247
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
+ .order_by(-TI.priority_weight, TI.execution_date)
Review comment:
Can you add a test case for this in https://github.com/apache/airflow/blob/master/tests/jobs/test_scheduler_job.py please to avoid regression
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r640592820
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,165 @@ def test_find_executable_task_instances_pool(self):
assert tis[3].key in res_keys
session.rollback()
+ def test_find_executable_task_instances_order_execution_date(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE + timedelta(hours=1),
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert [ti.key for ti in res] == [tis[1].key]
+ session.rollback()
+
+ def test_find_executable_task_instances_order_priority(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
Review comment:
```suggestion
dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1)
dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4)
```
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,165 @@ def test_find_executable_task_instances_pool(self):
assert tis[3].key in res_keys
session.rollback()
+ def test_find_executable_task_instances_order_execution_date(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE + timedelta(hours=1),
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert [ti.key for ti in res] == [tis[1].key]
+ session.rollback()
+
+ def test_find_executable_task_instances_order_priority(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert [ti.key for ti in res] == [tis[1].key]
+ session.rollback()
+
+ def test_find_executable_task_instances_order_execution_date_and_priority(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
Review comment:
```suggestion
dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1)
dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r640520885
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,171 @@ def test_find_executable_task_instances_pool(self):
assert tis[3].key in res_keys
session.rollback()
+ def test_find_executable_task_instances_order_execution_date(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE + timedelta(hours=1),
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert 1 == len(res)
+ res_keys = [ti.key for ti in res]
+ assert tis[1].key in res_keys
Review comment:
```suggestion
assert [ti.key for ti in res] == [tis[1].key]
```
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,171 @@ def test_find_executable_task_instances_pool(self):
assert tis[3].key in res_keys
session.rollback()
+ def test_find_executable_task_instances_order_execution_date(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE + timedelta(hours=1),
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert 1 == len(res)
+ res_keys = [ti.key for ti in res]
+ assert tis[1].key in res_keys
+ session.rollback()
+
+ def test_find_executable_task_instances_order_priority(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert 1 == len(res)
+ res_keys = [ti.key for ti in res]
+ assert tis[1].key in res_keys
+ session.rollback()
+
+ def test_find_executable_task_instances_order_execution_date_and_priority(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE + timedelta(hours=1),
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert 1 == len(res)
+ res_keys = [ti.key for ti in res]
+ assert tis[1].key in res_keys
Review comment:
```suggestion
assert [ti.key for ti in res] == [tis[1].key]
```
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1177,6 +1177,171 @@ def test_find_executable_task_instances_pool(self):
assert tis[3].key in res_keys
session.rollback()
+ def test_find_executable_task_instances_order_execution_date(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE + timedelta(hours=1),
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert 1 == len(res)
+ res_keys = [ti.key for ti in res]
+ assert tis[1].key in res_keys
+ session.rollback()
+
+ def test_find_executable_task_instances_order_priority(self):
+ dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+ dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+ task_id = 'task-a'
+ dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+ dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+ dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority=1)
+ dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority=4)
+ dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+ dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model_1 = DagModel(
+ dag_id=dag_id_1,
+ is_paused=False,
+ concurrency=dag_1.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_1)
+ dag_model_2 = DagModel(
+ dag_id=dag_id_2,
+ is_paused=False,
+ concurrency=dag_2.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model_2)
+ dr1 = dag_1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+ dr2 = dag_2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ tis = [
+ TaskInstance(dag1_task, dr1.execution_date),
+ TaskInstance(dag2_task, dr2.execution_date),
+ ]
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
+ session.flush()
+ assert 1 == len(res)
+ res_keys = [ti.key for ti in res]
+ assert tis[1].key in res_keys
Review comment:
```suggestion
assert [ti.key for ti in res] == [tis[1].key]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-860620521
Well done @ginevragaudioso π
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r616840885
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
+ .order_by(-TI.priority_weight, TI.execution_date)
Review comment:
@matt-land Checking the status of CI on the first commit might help to determine that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813749166
@ashb @kaxil @XD-DENG
Could some of you help with the check failures? I'd love to get this PR ready, but I am having a hard time understanding how the failure is connected to my changes here. Much appreciated!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813554213
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:
- Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
- In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
- Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, itβs a heavy docker but it ships with a working Airflow and a lot of integrations.
- Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
- Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
- Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
Apache Airflow is a community-driven project and together we are making it better π.
In case of doubts contact the developers at:
Mailing List: dev@airflow.apache.org
Slack: https://s.apache.org/airflow-slack
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso commented on a change in pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on a change in pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#discussion_r609194927
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -924,6 +924,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
+ .order_by(-TI.priority_weight, TI.execution_date)
Review comment:
Done, although I am not sure if the build failure I now get is related to my test or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730
> The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
The TIs created do not have the same execution date, unless I am missing something.
```
dr1 = dag_1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
)
dr2 = dag_2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
```
So the test is testing that we pick the one with the earliest execution date even if it is alphabetically later (which is exactly the bug being fixed).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-820808855
cc @ashb
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-823341979
@ginevragaudioso Sorry, was too late (even yesterday) as the RC was already being voted upon.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso edited a comment on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ginevragaudioso edited a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-827704730
> The tests need expanding -- the TIs it create have the same priority and execution_date, so we aren't actually asserting that the TIs are sorted correctly.
The TIs created do not have the same execution date, unless I am missing something.
```
dr1 = dag_1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
)
dr2 = dag_2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-823431034
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813695332
[The Workflow run](https://github.com/apache/airflow/actions/runs/720606699) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso commented on pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
ginevragaudioso commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-815268784
Thanks @kaxil for fixing the build. Anything else I should do here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] jhtimmins commented on pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-849271807
@ashb bumping you to review the updated tests you requested here https://github.com/apache/airflow/pull/15210#pullrequestreview-645935287
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb merged pull request #15210: Queue tasks with higher priority and earlier execution_date first.
Posted by GitBox <gi...@apache.org>.
ashb merged pull request #15210:
URL: https://github.com/apache/airflow/pull/15210
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #15210: AIRFLOW-15171 order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813695553
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ginevragaudioso removed a comment on pull request #15210: order query to find out tasks to queue
Posted by GitBox <gi...@apache.org>.
ginevragaudioso removed a comment on pull request #15210:
URL: https://github.com/apache/airflow/pull/15210#issuecomment-813749166
@ashb @kaxil @XD-DENG
Could some of you help with the check failures? I'd love to get this PR ready, but I am having a hard time understanding how the failure is connected to my changes here. Much appreciated!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org