You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/03/08 22:28:19 UTC

[airflow] branch main updated: Avoid considering EmptyOperator in mini scheduler (#29979)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a15792dd42 Avoid considering EmptyOperator in mini scheduler (#29979)
a15792dd42 is described below

commit a15792dd4216a1ae8c83c8c18ab255d2c558636c
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Thu Mar 9 00:28:08 2023 +0200

    Avoid considering EmptyOperator in mini scheduler (#29979)
    
    * Avoid considering EmptyOperator in mini scheduler
    
    EmptyOperator should not be executed on workers thus it should not be considered for the mini scheduler optimization.
    closes: https://github.com/apache/airflow/issues/29974
---
 airflow/models/taskinstance.py    | 12 ++++++++++-
 tests/models/test_taskinstance.py | 44 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 3f6ebefd95..1a826bc918 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2651,7 +2651,17 @@ class TaskInstance(Base, LoggingMixin):
                 task_id for task_id in partial_dag.task_ids if task_id not in task.downstream_task_ids
             }
 
-            schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids]
+            schedulable_tis = [
+                ti
+                for ti in info.schedulable_tis
+                if ti.task_id not in skippable_task_ids
+                and not (
+                    ti.task.inherits_from_empty_operator
+                    and not ti.task.on_execute_callback
+                    and not ti.task.on_success_callback
+                    and not ti.task.outlets
+                )
+            ]
             for schedulable_ti in schedulable_tis:
                 if not hasattr(schedulable_ti, "task"):
                     schedulable_ti.task = task.dag.get_task(schedulable_ti.task_id)
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 54884f633e..50cac05296 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3859,6 +3859,50 @@ def test_mapped_task_does_not_error_in_mini_scheduler_if_upstreams_are_not_done(
     assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
 
 
+def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session):
+    """
+    This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler.
+    Such operators should not run on workers thus the mini scheduler optimization should skip them and not
+    submit them directly to worker.
+    """
+    with dag_maker() as dag:
+
+        @dag.task
+        def first_task():
+            print(2)
+
+        @dag.task
+        def second_task():
+            print(2)
+
+        third_task = EmptyOperator(task_id="third_task")
+        forth_task = EmptyOperator(task_id="forth_task", on_success_callback=lambda x: print("hi"))
+
+        first_task() >> [second_task(), third_task, forth_task]
+        dag_run = dag_maker.create_dagrun()
+        first_ti = dag_run.get_task_instance(task_id="first_task")
+        second_ti = dag_run.get_task_instance(task_id="second_task")
+        third_ti = dag_run.get_task_instance(task_id="third_task")
+        forth_ti = dag_run.get_task_instance(task_id="forth_task")
+        first_ti.state = State.SUCCESS
+        second_ti.state = State.NONE
+        third_ti.state = State.NONE
+        forth_ti.state = State.NONE
+        session.merge(first_ti)
+        session.merge(second_ti)
+        session.merge(third_ti)
+        session.merge(forth_ti)
+        session.commit()
+        first_ti.schedule_downstream_tasks(session=session)
+        second_task = dag_run.get_task_instance(task_id="second_task")
+        third_task = dag_run.get_task_instance(task_id="third_task")
+        forth_task = dag_run.get_task_instance(task_id="forth_task")
+        assert second_task.state == State.SCHEDULED
+        assert third_task.state == State.NONE
+        assert forth_task.state == State.SCHEDULED
+        assert "2 downstream tasks scheduled from follow-on schedule" in caplog.text
+
+
 def test_mapped_task_expands_in_mini_scheduler_if_upstreams_are_done(dag_maker, caplog, session):
     """Test that mini scheduler expands mapped task"""
     with dag_maker() as dag: