You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "jedcunningham (via GitHub)" <gi...@apache.org> on 2023/03/08 21:37:41 UTC

[GitHub] [airflow] jedcunningham commented on a diff in pull request #29979: Avoid considering EmptyOperator in mini scheduler

jedcunningham commented on code in PR #29979:
URL: https://github.com/apache/airflow/pull/29979#discussion_r1130049541


##########
tests/models/test_taskinstance.py:
##########
@@ -3859,6 +3859,50 @@ def last_task():
     assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
 
 
+def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session):
+    """
+    This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler.
+    Such operators should not run on workers thus the mini scheduler optimization should skip them and not
+    submit them directly to worker.
+    """
+    with dag_maker() as dag:
+
+        @dag.task
+        def first_task():
+            print(2)
+
+        @dag.task
+        def second_task():
+            print(2)
+
+        third_task = EmptyOperator(task_id="third_task")
+        forth_task = EmptyOperator(task_id="forth_task", on_success_callback=lambda x: print("hi"))
+
+        first_task() >> [second_task(), third_task, forth_task]
+        dag_run = dag_maker.create_dagrun()
+        first_ti = dag_run.get_task_instance(task_id="first_task")
+        second_ti = dag_run.get_task_instance(task_id="second_task")
+        third_ti = dag_run.get_task_instance(task_id="third_task")
+        forth_ti = dag_run.get_task_instance(task_id="forth_task")
+        first_ti.state = State.SUCCESS
+        second_ti.state = State.NONE
+        third_ti.state = State.NONE
+        forth_ti.state = State.NONE
+        session.merge(first_ti)
+        session.merge(second_ti)
+        session.merge(third_ti)
+        session.merge(forth_ti)
+        session.commit()
+        first_ti.schedule_downstream_tasks(session=session)
+        second_task = dag_run.get_task_instance(task_id="second_task")
+        third_task = dag_run.get_task_instance(task_id="third_task")
+        forth_task = dag_run.get_task_instance(task_id="forth_task")
+        assert second_task.state != State.NONE
+        assert third_task.state == State.NONE
+        assert forth_task.state != State.NONE

Review Comment:
   ```suggestion
           assert second_task.state == State.SCHEDULED
           assert third_task.state == State.NONE
           assert forth_task.state == State.SCHEDULED
   ```
   
   nit, feel more natural to check what it is, not what it isn't?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org