You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/29 00:35:38 UTC

[GitHub] [airflow] kaxil opened a new pull request #16700: Fix ``CeleryKubernetesExecutor``

kaxil opened a new pull request #16700:
URL: https://github.com/apache/airflow/pull/16700


   closes https://github.com/apache/airflow/issues/16326
   
   Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
   we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
   instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
   which fails.
   
   ```
   [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
       args.func(args)
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
       pool=args.pool,
     File "<string>", line 4, in __init__
     File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
       manager.dispatch.init_failure(self, args, kwargs)
     File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       with_traceback=exc_tb,
     File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
       raise exception
     File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
       return manager.original_init(*mixed[1:], **kwargs)
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
       super().__init__(*args, **kwargs)
     File "<string>", line 6, in __init__
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
       self.executor = executor or ExecutorLoader.get_default_executor()
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
       cls._default_executor = cls.load_executor(executor_name)
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
       return cls.__load_celery_kubernetes_executor()
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
       kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
     File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
       self._manager = multiprocessing.Manager()
     File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
       m.start()
     File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
       self._process.start()
     File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
       'daemonic processes are not allowed to have children'
   AssertionError: daemonic processes are not allowed to have children
   ```
   
   We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660365084



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -183,7 +186,8 @@ def __init__(
         self.conf = conf
         self.rerun_failed_tasks = rerun_failed_tasks
         self.run_backwards = run_backwards
-        super().__init__(*args, **kwargs)
+        self.executor = executor or ExecutorLoader.get_default_executor()
+        super().__init__(executor=self.executor, *args, **kwargs)

Review comment:
       How about instead of doing this we make `executor` a lazy/cached property on base job?
   
   That way it is not loaded until it is first accessed, which might fix the problem? 

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       ```suggestion
       @cached_property
       def executor(self):
           return ExecutorLoader.get_default_executor()
   
       @property
       def executor_class(self):
           return self.executor.__class__.__name__
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor = executor
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Ah, I didn't realise that. In which case change this to a `@cached_property` too, and then set it in the `if executor:` block.

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Oh right no. Okay.

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Leave executor_class set as a normal attribute (as you had it) -- we just don't need `_executor` but can directly set `self.executor`

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor = executor
               self.executor_class = executor.__class__.__name__
           else:
               self.executor_class = conf.get('core', 'EXECUTOR')
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()

Review comment:
       ```suggestion
       @cached_property
       def executor(self):
           return  ExecutorLoader.get_default_executor()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660268616



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       another option would be to make `executor` a cached property and `executor_class` a property.  this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)
   
   ```python
   @cached_property
   def executor(self):
       return self._executor or  ExecutorLoader.get_default_executor()  # store init param `executor` as private attr `_executor` 
   ```
   
   this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660483731



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor = executor
               self.executor_class = executor.__class__.__name__
           else:
               self.executor_class = conf.get('core', 'EXECUTOR')
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660483731



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       ```suggestion
       @cached_property
       def executor(self):
           return ExecutorLoader.get_default_executor()
   
       @property
       def executor_class(self):
           return self.executor.__class__.__name__
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870467375


   Updated the PR with `cached_propery` approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870467375


   Updated the PR with `cached_propery` approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660273193



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       i do also wonder.... why is it that we only get this issue with CKE?  is there perhaps something about the way in which CKE is designed that causes this problem?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870501769


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660201602



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       The other option is to keep it `None` in `BaseJob` and override it in `SchedulerJob` and `BackfillJob`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660493536



##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Leave executor_class set as a normal attribute (as you had it) -- we just don't need `_executor` but can directly set `self.executor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660671207



##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,10 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        return  ExecutorLoader.get_default_executor()

Review comment:
       ```suggestion
           return ExecutorLoader.get_default_executor()
   ```
   nit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660477307



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       >i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
   
   The issue is that it tries to create an instance of `KubernetesExecutor` inside `CeleryExecutor` and `KubernetesExecutor` creates a multiprocessing Manager  & Queue in its `__init__` which creates issues with Celery as explained in https://github.com/celery/celery/issues/4525




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660493187



##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Oh right no. Okay.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #16700:
URL: https://github.com/apache/airflow/pull/16700


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660494022



##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()

Review comment:
       ```suggestion
       @cached_property
       def executor(self):
           return  ExecutorLoader.get_default_executor()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660258928



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor

Review comment:
       i think this could go behind `if TYPE_CHECKING`

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       another option would be to make `executor` a cached property and `executor_class` a property.  this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway.
   
   ```python
   @cached_property
   def executor(self):
       return self._executor or  ExecutorLoader.get_default_executor()  # store init param `executor` as private attr `_executor` 
   ```
   
   this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
   

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -110,11 +111,14 @@ def __init__(
         processor_poll_interval: float = conf.getfloat('scheduler', 'processor_poll_interval'),
         do_pickle: bool = False,
         log: logging.Logger = None,
+        executor: Optional[BaseExecutor] = None,

Review comment:
       missing from docstring

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor

Review comment:
       though.... not sure it matters e.g. if already imported implicitly .... but just in case i figured i'd point it out

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       another option would be to make `executor` a cached property and `executor_class` a property.  this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)
   
   ```python
   @cached_property
   def executor(self):
       return self._executor or  ExecutorLoader.get_default_executor()  # store init param `executor` as private attr `_executor` 
   ```
   
   this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
   

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       i do also wonder.... why is it that we only get this issue with CKE?  is there perhaps something about the way in which CKE is designed that causes this problem?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660483731



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor = executor
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660201602



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       The other option is to keep it `None` in `BaseJob` and override it in `SchedulerJob` and `BackfillJob`

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       https://github.com/apache/airflow/pull/16700/commits/a7f6efbabc94512fcf6b8ada80f13b7291a0d2c8 vs https://github.com/apache/airflow/pull/16700/commits/376ffd1deff481717758281608b0764f38753956
   
   Thoughts?

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       >i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
   
   The issue is that it tries to create an instance of `KubernetesExecutor` inside `CeleryExecutor` and `KubernetesExecutor` creates a multiprocessing Manager  & Queue in its `__init__` which creates issues with Celery as explained in https://github.com/celery/celery/issues/4525

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Well this will cause the same issue, executor_class is stored in DB.
   
   So when the Job model will be saved to DB it access self.executor_class which will execute self.execute and that will instantiate Executor, which we want to avoid




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870501769


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660492871



##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Ah, I didn't realise that. In which case change this to a `@cached_property` too, and then set it in the `if executor:` block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660365084



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -183,7 +186,8 @@ def __init__(
         self.conf = conf
         self.rerun_failed_tasks = rerun_failed_tasks
         self.run_backwards = run_backwards
-        super().__init__(*args, **kwargs)
+        self.executor = executor or ExecutorLoader.get_default_executor()
+        super().__init__(executor=self.executor, *args, **kwargs)

Review comment:
       How about instead of doing this we make `executor` a lazy/cached property on base job?
   
   That way it is not loaded until it is first accessed, which might fix the problem? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660491447



##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Well this will cause the same issue, executor_class is stored in DB.
   
   So when the Job model will be saved to DB it access self.executor_class which will execute self.execute and that will instantiate Executor, which we want to avoid




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660213566



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       https://github.com/apache/airflow/pull/16700/commits/a7f6efbabc94512fcf6b8ada80f13b7291a0d2c8 vs https://github.com/apache/airflow/pull/16700/commits/376ffd1deff481717758281608b0764f38753956
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660258928



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor

Review comment:
       i think this could go behind `if TYPE_CHECKING`

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       another option would be to make `executor` a cached property and `executor_class` a property.  this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway.
   
   ```python
   @cached_property
   def executor(self):
       return self._executor or  ExecutorLoader.get_default_executor()  # store init param `executor` as private attr `_executor` 
   ```
   
   this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
   

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -110,11 +111,14 @@ def __init__(
         processor_poll_interval: float = conf.getfloat('scheduler', 'processor_poll_interval'),
         do_pickle: bool = False,
         log: logging.Logger = None,
+        executor: Optional[BaseExecutor] = None,

Review comment:
       missing from docstring

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor

Review comment:
       though.... not sure it matters e.g. if already imported implicitly .... but just in case i figured i'd point it out




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org