You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "jedcunningham (via GitHub)" <gi...@apache.org> on 2023/03/08 21:37:41 UTC
[GitHub] [airflow] jedcunningham commented on a diff in pull request #29979: Avoid considering EmptyOperator in mini scheduler
jedcunningham commented on code in PR #29979:
URL: https://github.com/apache/airflow/pull/29979#discussion_r1130049541
##########
tests/models/test_taskinstance.py:
##########
@@ -3859,6 +3859,50 @@ def last_task():
assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
+def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session):
+ """
+ This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler.
+ Such operators should not run on workers thus the mini scheduler optimization should skip them and not
+ submit them directly to worker.
+ """
+ with dag_maker() as dag:
+
+ @dag.task
+ def first_task():
+ print(2)
+
+ @dag.task
+ def second_task():
+ print(2)
+
+ third_task = EmptyOperator(task_id="third_task")
+ forth_task = EmptyOperator(task_id="forth_task", on_success_callback=lambda x: print("hi"))
+
+ first_task() >> [second_task(), third_task, forth_task]
+ dag_run = dag_maker.create_dagrun()
+ first_ti = dag_run.get_task_instance(task_id="first_task")
+ second_ti = dag_run.get_task_instance(task_id="second_task")
+ third_ti = dag_run.get_task_instance(task_id="third_task")
+ forth_ti = dag_run.get_task_instance(task_id="forth_task")
+ first_ti.state = State.SUCCESS
+ second_ti.state = State.NONE
+ third_ti.state = State.NONE
+ forth_ti.state = State.NONE
+ session.merge(first_ti)
+ session.merge(second_ti)
+ session.merge(third_ti)
+ session.merge(forth_ti)
+ session.commit()
+ first_ti.schedule_downstream_tasks(session=session)
+ second_task = dag_run.get_task_instance(task_id="second_task")
+ third_task = dag_run.get_task_instance(task_id="third_task")
+ forth_task = dag_run.get_task_instance(task_id="forth_task")
+ assert second_task.state != State.NONE
+ assert third_task.state == State.NONE
+ assert forth_task.state != State.NONE
Review Comment:
```suggestion
assert second_task.state == State.SCHEDULED
assert third_task.state == State.NONE
assert forth_task.state == State.SCHEDULED
```
nit, feel more natural to check what it is, not what it isn't?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org