You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/03/08 22:28:19 UTC
[airflow] branch main updated: Avoid considering EmptyOperator in mini scheduler (#29979)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a15792dd42 Avoid considering EmptyOperator in mini scheduler (#29979)
a15792dd42 is described below
commit a15792dd4216a1ae8c83c8c18ab255d2c558636c
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Thu Mar 9 00:28:08 2023 +0200
Avoid considering EmptyOperator in mini scheduler (#29979)
* Avoid considering EmptyOperator in mini scheduler
EmptyOperator should not be executed on workers thus it should not be considered for the mini scheduler optimization.
closes: https://github.com/apache/airflow/issues/29974
---
airflow/models/taskinstance.py | 12 ++++++++++-
tests/models/test_taskinstance.py | 44 +++++++++++++++++++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 3f6ebefd95..1a826bc918 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2651,7 +2651,17 @@ class TaskInstance(Base, LoggingMixin):
task_id for task_id in partial_dag.task_ids if task_id not in task.downstream_task_ids
}
- schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids]
+ schedulable_tis = [
+ ti
+ for ti in info.schedulable_tis
+ if ti.task_id not in skippable_task_ids
+ and not (
+ ti.task.inherits_from_empty_operator
+ and not ti.task.on_execute_callback
+ and not ti.task.on_success_callback
+ and not ti.task.outlets
+ )
+ ]
for schedulable_ti in schedulable_tis:
if not hasattr(schedulable_ti, "task"):
schedulable_ti.task = task.dag.get_task(schedulable_ti.task_id)
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 54884f633e..50cac05296 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3859,6 +3859,50 @@ def test_mapped_task_does_not_error_in_mini_scheduler_if_upstreams_are_not_done(
assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
+def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session):
+ """
+ This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler.
+ Such operators should not run on workers thus the mini scheduler optimization should skip them and not
+ submit them directly to worker.
+ """
+ with dag_maker() as dag:
+
+ @dag.task
+ def first_task():
+ print(2)
+
+ @dag.task
+ def second_task():
+ print(2)
+
+ third_task = EmptyOperator(task_id="third_task")
+ forth_task = EmptyOperator(task_id="forth_task", on_success_callback=lambda x: print("hi"))
+
+ first_task() >> [second_task(), third_task, forth_task]
+ dag_run = dag_maker.create_dagrun()
+ first_ti = dag_run.get_task_instance(task_id="first_task")
+ second_ti = dag_run.get_task_instance(task_id="second_task")
+ third_ti = dag_run.get_task_instance(task_id="third_task")
+ forth_ti = dag_run.get_task_instance(task_id="forth_task")
+ first_ti.state = State.SUCCESS
+ second_ti.state = State.NONE
+ third_ti.state = State.NONE
+ forth_ti.state = State.NONE
+ session.merge(first_ti)
+ session.merge(second_ti)
+ session.merge(third_ti)
+ session.merge(forth_ti)
+ session.commit()
+ first_ti.schedule_downstream_tasks(session=session)
+ second_task = dag_run.get_task_instance(task_id="second_task")
+ third_task = dag_run.get_task_instance(task_id="third_task")
+ forth_task = dag_run.get_task_instance(task_id="forth_task")
+ assert second_task.state == State.SCHEDULED
+ assert third_task.state == State.NONE
+ assert forth_task.state == State.SCHEDULED
+ assert "2 downstream tasks scheduled from follow-on schedule" in caplog.text
+
+
def test_mapped_task_expands_in_mini_scheduler_if_upstreams_are_done(dag_maker, caplog, session):
"""Test that mini scheduler expands mapped task"""
with dag_maker() as dag: