You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "eladkal (via GitHub)" <gi...@apache.org> on 2023/03/08 15:03:20 UTC

[GitHub] [airflow] eladkal opened a new pull request, #29979: Avoid considering EmptyOperator in mini scheduler

eladkal opened a new pull request, #29979:
URL: https://github.com/apache/airflow/pull/29979

   EmptyOperator should not be executed on workers thus it should not be considered for the mini scheduler optimization. closes: https://github.com/apache/airflow/issues/29974
   
   
   TODO:
   add test
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal merged pull request #29979: Avoid considering EmptyOperator in mini scheduler

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal merged PR #29979:
URL: https://github.com/apache/airflow/pull/29979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #29979: Avoid considering EmptyOperator in mini scheduler

Posted by "jedcunningham (via GitHub)" <gi...@apache.org>.
jedcunningham commented on code in PR #29979:
URL: https://github.com/apache/airflow/pull/29979#discussion_r1130049541


##########
tests/models/test_taskinstance.py:
##########
@@ -3859,6 +3859,50 @@ def last_task():
     assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
 
 
+def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session):
+    """
+    This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler.
+    Such operators should not run on workers thus the mini scheduler optimization should skip them and not
+    submit them directly to worker.
+    """
+    with dag_maker() as dag:
+
+        @dag.task
+        def first_task():
+            print(2)
+
+        @dag.task
+        def second_task():
+            print(2)
+
+        third_task = EmptyOperator(task_id="third_task")
+        forth_task = EmptyOperator(task_id="forth_task", on_success_callback=lambda x: print("hi"))
+
+        first_task() >> [second_task(), third_task, forth_task]
+        dag_run = dag_maker.create_dagrun()
+        first_ti = dag_run.get_task_instance(task_id="first_task")
+        second_ti = dag_run.get_task_instance(task_id="second_task")
+        third_ti = dag_run.get_task_instance(task_id="third_task")
+        forth_ti = dag_run.get_task_instance(task_id="forth_task")
+        first_ti.state = State.SUCCESS
+        second_ti.state = State.NONE
+        third_ti.state = State.NONE
+        forth_ti.state = State.NONE
+        session.merge(first_ti)
+        session.merge(second_ti)
+        session.merge(third_ti)
+        session.merge(forth_ti)
+        session.commit()
+        first_ti.schedule_downstream_tasks(session=session)
+        second_task = dag_run.get_task_instance(task_id="second_task")
+        third_task = dag_run.get_task_instance(task_id="third_task")
+        forth_task = dag_run.get_task_instance(task_id="forth_task")
+        assert second_task.state != State.NONE
+        assert third_task.state == State.NONE
+        assert forth_task.state != State.NONE

Review Comment:
   ```suggestion
           assert second_task.state == State.SCHEDULED
           assert third_task.state == State.NONE
           assert forth_task.state == State.SCHEDULED
   ```
   
   nit, feel more natural to check what it is, not what it isn't?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #29979: Avoid considering EmptyOperator in mini scheduler

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #29979:
URL: https://github.com/apache/airflow/pull/29979#discussion_r1129911387


##########
airflow/models/taskinstance.py:
##########
@@ -2651,7 +2651,11 @@ def schedule_downstream_tasks(self, session=None):
                 task_id for task_id in partial_dag.task_ids if task_id not in task.downstream_task_ids
             }
 
-            schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids]
+            schedulable_tis = [
+                ti
+                for ti in info.schedulable_tis
+                if ti.task_id not in skippable_task_ids and not ti.task.inherits_from_empty_operator
+            ]

Review Comment:
   This is schedule_downstream_tasks for the mini-scheduler
   It's being called in 
   https://github.com/apache/airflow/blob/f9e9d23457cba5d3e18b5bdb7b65ecc63735b65b/airflow/jobs/local_task_job.py#L225
   as part of `handle_task_exit` which is when the task is finished. Basically when task is finished the mini scheduler kicks in. We already have the dag loaded and we assume direct downstream tasks are next to be scheduled so if we can set them here it saves the need for scheduler to scan them in it's next loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29979: Avoid considering EmptyOperator in mini scheduler

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29979:
URL: https://github.com/apache/airflow/pull/29979#issuecomment-1460375543

   NIce one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on pull request #29979: Avoid considering EmptyOperator in mini scheduler

Posted by "jedcunningham (via GitHub)" <gi...@apache.org>.
jedcunningham commented on PR #29979:
URL: https://github.com/apache/airflow/pull/29979#issuecomment-1460458268

   It should probably check for callbacks and outlets as well: https://github.com/apache/airflow/blob/c3867781e09b7e0e0d19c0991865a2453194d9a8/airflow/models/dagrun.py#L1262-L1264
   
   Or, just letting the scheduler deal with it instead is probably fine too. I'm good either way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #29979: Avoid considering EmptyOperator in mini scheduler

Posted by "ephraimbuddy (via GitHub)" <gi...@apache.org>.
ephraimbuddy commented on code in PR #29979:
URL: https://github.com/apache/airflow/pull/29979#discussion_r1129725652


##########
airflow/models/taskinstance.py:
##########
@@ -2651,7 +2651,11 @@ def schedule_downstream_tasks(self, session=None):
                 task_id for task_id in partial_dag.task_ids if task_id not in task.downstream_task_ids
             }
 
-            schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids]
+            schedulable_tis = [
+                ti
+                for ti in info.schedulable_tis
+                if ti.task_id not in skippable_task_ids and not ti.task.inherits_from_empty_operator
+            ]

Review Comment:
   I have some doubts; `dagrun.schedule_tis` marks empty operators as successful when called by the scheduler, how does it still get to this point?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29979: Avoid considering EmptyOperator in mini scheduler

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29979:
URL: https://github.com/apache/airflow/pull/29979#issuecomment-1460737535

   @jedcunningham good point. added handling for that cases


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org