You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/17 20:02:11 UTC

[GitHub] [airflow] leonsmith opened a new pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

leonsmith opened a new pull request #14861:
URL: https://github.com/apache/airflow/pull/14861


   [1.10](https://github.com/apache/airflow/blob/1.10.15/airflow/executors/base_executor.py#L153) passes the Task Instance queue, but the refactor in [2.0](https://github.com/apache/airflow/blob/2.0.1/airflow/executors/base_executor.py#L188) looks to have missed this.
   
   Any schedulers depending on the queue functionality that haven't overridden `trigger_tasks` will see queue functionality break when upgrading to 2.0


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] leonsmith commented on a change in pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
leonsmith commented on a change in pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#discussion_r596377787



##########
File path: airflow/executors/base_executor.py
##########
@@ -182,10 +182,10 @@ def trigger_tasks(self, open_slots: int) -> None:
         sorted_queue = self.order_queued_tasks_by_priority()
 
         for _ in range(min((open_slots, len(self.queued_tasks)))):
-            key, (command, _, _, ti) = sorted_queue.pop(0)
+            key, (command, _, queue, ti) = sorted_queue.pop(0)
             self.queued_tasks.pop(key)
             self.running.add(key)
-            self.execute_async(key=key, command=command, queue=None, executor_config=ti.executor_config)
+            self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)

Review comment:
       Yep celery is not broken as it does override the `trigger_tasks` to pass the queue.
   
   The BaseExecutor should expose all the functionality (like it used to) though without having to have executors override this function? Otherwise the queue argument to `execute_async` is redundant?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#issuecomment-801497457


   [The Workflow run](https://github.com/apache/airflow/actions/runs/662713522) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#issuecomment-801822272


   Can you rebase on latest master please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#discussion_r596743371



##########
File path: airflow/executors/base_executor.py
##########
@@ -182,10 +182,10 @@ def trigger_tasks(self, open_slots: int) -> None:
         sorted_queue = self.order_queued_tasks_by_priority()
 
         for _ in range(min((open_slots, len(self.queued_tasks)))):
-            key, (command, _, _, ti) = sorted_queue.pop(0)
+            key, (command, _, queue, ti) = sorted_queue.pop(0)
             self.queued_tasks.pop(key)
             self.running.add(key)
-            self.execute_async(key=key, command=command, queue=None, executor_config=ti.executor_config)
+            self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)

Review comment:
       Oh yeah 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] leonsmith commented on pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
leonsmith commented on pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#issuecomment-801855381


   Rebased & pushed 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#issuecomment-801814123


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #14861:
URL: https://github.com/apache/airflow/pull/14861


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#issuecomment-810207622


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14861: Pass queue in BaseExecutor.execute_async like in airflow 1.10

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14861:
URL: https://github.com/apache/airflow/pull/14861#discussion_r596371734



##########
File path: airflow/executors/base_executor.py
##########
@@ -182,10 +182,10 @@ def trigger_tasks(self, open_slots: int) -> None:
         sorted_queue = self.order_queued_tasks_by_priority()
 
         for _ in range(min((open_slots, len(self.queued_tasks)))):
-            key, (command, _, _, ti) = sorted_queue.pop(0)
+            key, (command, _, queue, ti) = sorted_queue.pop(0)
             self.queued_tasks.pop(key)
             self.running.add(key)
-            self.execute_async(key=key, command=command, queue=None, executor_config=ti.executor_config)
+            self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)

Review comment:
       Celery Executor already overrides this method:
   
   https://github.com/apache/airflow/blob/2a2adb3f94cc165014d746102e12f9620f271391/airflow/executors/celery_executor.py#L244-L263




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org