You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/30 12:56:01 UTC

[airflow] branch v2-0-test updated: Pass queue to BaseExecutor.execute_async like in airflow 1.10 (#14861)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-0-test by this push:
     new fb81726  Pass queue to BaseExecutor.execute_async like in airflow 1.10 (#14861)
fb81726 is described below

commit fb8172628c5ed37321fa53f7b6481996d75f8364
Author: Leon Smith <em...@leonmarksmith.com>
AuthorDate: Tue Mar 30 13:55:00 2021 +0100

    Pass queue to BaseExecutor.execute_async like in airflow 1.10 (#14861)
    
    Any schedulers depending on the queue functionality that haven't overridden
    `trigger_tasks` method will see queue functionality break when upgrading to 2.0
    
    (cherry picked from commit 375d26d880338e4d86b576df466f70183085d57f)
---
 airflow/executors/base_executor.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index fc75305..8e87901 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -182,10 +182,10 @@ class BaseExecutor(LoggingMixin):
         sorted_queue = self.order_queued_tasks_by_priority()
 
         for _ in range(min((open_slots, len(self.queued_tasks)))):
-            key, (command, _, _, ti) = sorted_queue.pop(0)
+            key, (command, _, queue, ti) = sorted_queue.pop(0)
             self.queued_tasks.pop(key)
             self.running.add(key)
-            self.execute_async(key=key, command=command, queue=None, executor_config=ti.executor_config)
+            self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)
 
     def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
         """