You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/18 18:07:09 UTC

[GitHub] [airflow] RNHTTR opened a new pull request #14883: Fix celery executor bug trying to call len on map

RNHTTR opened a new pull request #14883:
URL: https://github.com/apache/airflow/pull/14883


   When the celery executor tries to adopt task instances, and there are indeed task instances to adopt, `bulk_state_fetcher.get_many` [is called](https://github.com/apache/airflow/blob/90ab60bba877c65cb93871b97db13a179820d28b/airflow/executors/celery_executor.py#L478-L480), passing a map object. If the celery `result_backend` is not  an instance of BaseKeyValueStoreBackend or DatabaseBackend, the method `_get_many_using_multiprocessing` will be called. This method attempts to get the len of its parameter, but you can't take the length of a map object. So, it needs to be converted to a list (or perhaps another iterable?) first. Since there is a debug statement that first takes the len before `_get_many_using_multiprocessing` [is called](https://github.com/apache/airflow/blob/90ab60bba877c65cb93871b97db13a179820d28b/airflow/executors/celery_executor.py#L555), it makes sense to convert the map object to a list immediately prior even if it wasn't handled in the `else` block. 
   
   I wasn't able to actually reproduce the issue with Airflow, but I was able to reproduce it with the [test_try_adopt_task_instances](https://github.com/apache/airflow/blob/90ab60bba877c65cb93871b97db13a179820d28b/tests/executors/test_celery_executor.py#L319-L353) test by setting the celery result backend to `rpc://` and the broker to redis.
   
   closes: #14163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r602787469



##########
File path: tests/executors/test_celery_executor.py
##########
@@ -469,3 +470,23 @@ def test_should_support_base_backend(self):
                 )
 
         assert result == {'123': ('SUCCESS', None), '456': ("PENDING", None)}
+
+    @pytest.mark.integration("redis")
+    @pytest.mark.integration("rabbitmq")
+    @pytest.mark.backend("mysql", "postgres")
+    def test_should_support_base_backend_from_try_adopt_task_instances(self):

Review comment:
       Good call.
   
   I refactored the tests for the possible iterations of `get_many` to set the log mode to `DEBUG` and updated the debug statement to account for being a map object vs being a list object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r605044611



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       @RNHTTR I think most of the cases are not large enough for this to make much of a difference - the only map case is in code that runs once every 30s, and is not performance _critical_, so for ease of understanding, lets change the one usage instead of complicating this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r603577662



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       That's probably the most intuitive fix, but in the case of the recommended backends (i.e. `BaseKeyValueStoreBackend` or `DatabaseBackend`, we'd be converting it to a list and then immediately creating a set based off that list instead of just creating the set based on the map object. Do you think that could have a potential performance hit if a lot of tasks are running?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r601996629



##########
File path: airflow/executors/celery_executor.py
##########
@@ -547,11 +547,12 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
+        else:
+            async_results = list(async_results)
+            result = self._get_many_using_multiprocessing(async_results)
+        async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       ~@kaxil Looks like the failed build is due to a bug that was corrected yesterday #14986~ actually i just re-rebased and pushed again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r603366574



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       The other possible fix is to change the one call that passes a map (L478) to be `list(map(...))`

##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results
+            result = self._get_many_using_multiprocessing(async_results)
+        if logging.getLevelName(self.log.level) == "DEBUG":
+            if isinstance(async_results, map):
+                self.log.debug("Fetched state for %d task(s)", len(result))
+            else:
+                self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results))

Review comment:
       ```suggestion
               self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results))
   ```
   
   I think this should be enough, as you have already changed the type on L552.

##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       ```suggestion
               if isinstance(async_results, map):
                   async_results = list(async_results)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606229459



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       Task Adoption is when running a scheduler, if it dies another scheduler is able to "adopt" tasks started by that Scheduler/executor -- this currently works for Celery, Kubernetes (and CeleryKubernete) executor
   
   Code:
   
   **Scheduler**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/jobs/scheduler_job.py#L1825
   
   **Celery**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/executors/celery_executor.py#L345




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606302152



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       Thanks Kaxil that's helpful. Do you think it'd be a good idea to put together a GLOSSARY.rst or similar to capture some of these more nuanced concepts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r603576406



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results
+            result = self._get_many_using_multiprocessing(async_results)
+        if logging.getLevelName(self.log.level) == "DEBUG":
+            if isinstance(async_results, map):
+                self.log.debug("Fetched state for %d task(s)", len(result))
+            else:
+                self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results))

Review comment:
       Only if it's not a keyvalue or database celery backend. If it's a keyvalue or database backend, `_tasks_list_to_task_ids` is called in `_get_many_from_kv_backend` and `_get_many_from_db_backend`, respectively. In these cases, `async_results` will be length 0 if converted to a list before calling `_get_many_from_kv_backend` (which I think would be kinda redundant).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r599876910



##########
File path: airflow/executors/celery_executor.py
##########
@@ -547,11 +547,12 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
+        else:
+            async_results = list(async_results)
+            result = self._get_many_using_multiprocessing(async_results)
+        async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       @kaxil Done. One of these days I'll have a PR that includes a test ;)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r598675988



##########
File path: airflow/executors/celery_executor.py
##########
@@ -547,11 +547,12 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
+        else:
+            async_results = list(async_results)
+            result = self._get_many_using_multiprocessing(async_results)
+        async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       can you please add a test -- similar to the one you mentioned in the PR description




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r601996629



##########
File path: airflow/executors/celery_executor.py
##########
@@ -547,11 +547,12 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
+        else:
+            async_results = list(async_results)
+            result = self._get_many_using_multiprocessing(async_results)
+        async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       @kaxil Looks like the failed build is due to a bug that was corrected yesterday #14986




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r602394746



##########
File path: tests/executors/test_celery_executor.py
##########
@@ -469,3 +470,23 @@ def test_should_support_base_backend(self):
                 )
 
         assert result == {'123': ('SUCCESS', None), '456': ("PENDING", None)}
+
+    @pytest.mark.integration("redis")
+    @pytest.mark.integration("rabbitmq")
+    @pytest.mark.backend("mysql", "postgres")
+    def test_should_support_base_backend_from_try_adopt_task_instances(self):

Review comment:
       Do you need to set logging to debug for this test to cover the new code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #14883:
URL: https://github.com/apache/airflow/pull/14883


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#issuecomment-809459046


   @RNHTTR I've just taken another detailed look -- and I think there may be a simpler fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606009786



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       Sounds good. PR updated. Any resources you recommend on diving into some of airflow's trickier topics? I can't seem to find much on task adoption for example. We run managed airflow, so a lot of airflow's challenging aspects are abstracted away. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606229459



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       Task Adoption is when running a scheduler, if it dies another scheduler is able to "adopt" tasks started by that Scheduler/executor -- this currently works for Celery, Kubernetes (and CeleryKubernete) executor
   
   Code:
   
   **Scheduler**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/jobs/scheduler_job.py#L1825
   
   **Celery**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/executors/celery_executor.py#L448-L503




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606229459



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       Task Adoption is when running a scheduler, if it dies another scheduler is able to "adopt" tasks started by the first Scheduler/executor -- this currently works for Celery, Kubernetes (and CeleryKubernete) executor
   
   Code:
   
   **Scheduler**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/jobs/scheduler_job.py#L1825
   
   **Celery**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/executors/celery_executor.py#L448-L503




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606814100



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       oh yeah probably




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r602787469



##########
File path: tests/executors/test_celery_executor.py
##########
@@ -469,3 +470,23 @@ def test_should_support_base_backend(self):
                 )
 
         assert result == {'123': ('SUCCESS', None), '456': ("PENDING", None)}
+
+    @pytest.mark.integration("redis")
+    @pytest.mark.integration("rabbitmq")
+    @pytest.mark.backend("mysql", "postgres")
+    def test_should_support_base_backend_from_try_adopt_task_instances(self):

Review comment:
       Good call.
   
   I refactored the tests for the possible iterations of `get_many` to set the log mode to `DEBUG` and updated the debug statement to account for being a map object vs being a list object. If it's a map object and the backend is not a base backend, the length of `async_results` will always be 0 because it comes in as an iterator and `_tasks_list_to_task_ids` converts it to a set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#issuecomment-805204671


   [The Workflow run](https://github.com/apache/airflow/actions/runs/680827545) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#issuecomment-813036761


   @ashb @kaxil bump


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606229459



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       Task Adoption is when running a scheduler, if it dies another scheduler is able to "adopt" tasks started by that Scheduler/executor -- this currently works for Celery, Kubernetes (and CeleryKubernete) executor
   
   Code:
   
   **Scheduler**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/jobs/scheduler_job.py#L1825
   
   **Celery**:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/executors/celery_executor.py#L448




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r604844698



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       @ashb thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r606229459



##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
         super().__init__()
         self._sync_parallelism = sync_parallelism
 
+    def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+        return {a.task_id for a in async_tasks}
+
     def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
         """Gets status for many Celery tasks using the best method available."""
         if isinstance(app.backend, BaseKeyValueStoreBackend):
             result = self._get_many_from_kv_backend(async_results)
-            return result
-        if isinstance(app.backend, DatabaseBackend):
+        elif isinstance(app.backend, DatabaseBackend):
             result = self._get_many_from_db_backend(async_results)
-            return result
-        result = self._get_many_using_multiprocessing(async_results)
-        self.log.debug("Fetched %d states for %d task", len(result), len(async_results))
+        else:
+            async_results = list(async_results) if isinstance(async_results, map) else async_results

Review comment:
       Task Adoption is when running a scheduler, if it dies another scheduler is able to "adopt" tasks started by that Scheduler/executor -- this currently works for Celery, Kubernetes (and CeleryKubernete) executor
   
   Code:
   
   https://github.com/apache/airflow/blob/4d1b2e985492894e6064235688cf8f381b1e8858/airflow/jobs/scheduler_job.py#L1825




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on pull request #14883: Fix celery executor bug trying to call len on map

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#issuecomment-803686952


   @ashb @kaxil Any chance I can get a review for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org