You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/29 00:35:38 UTC
[GitHub] [airflow] kaxil opened a new pull request #16700: Fix ``CeleryKubernetesExecutor``
kaxil opened a new pull request #16700:
URL: https://github.com/apache/airflow/pull/16700
closes https://github.com/apache/airflow/issues/16326
Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.
```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
args.func(args)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
pool=args.pool,
File "<string>", line 4, in __init__
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
manager.dispatch.init_failure(self, args, kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
with_traceback=exc_tb,
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
return manager.original_init(*mixed[1:], **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
super().__init__(*args, **kwargs)
File "<string>", line 6, in __init__
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
self.executor = executor or ExecutorLoader.get_default_executor()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
cls._default_executor = cls.load_executor(executor_name)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
return cls.__load_celery_kubernetes_executor()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
self._manager = multiprocessing.Manager()
File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
m.start()
File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
self._process.start()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```
We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660365084
##########
File path: airflow/jobs/backfill_job.py
##########
@@ -183,7 +186,8 @@ def __init__(
self.conf = conf
self.rerun_failed_tasks = rerun_failed_tasks
self.run_backwards = run_backwards
- super().__init__(*args, **kwargs)
+ self.executor = executor or ExecutorLoader.get_default_executor()
+ super().__init__(executor=self.executor, *args, **kwargs)
Review comment:
How about instead of doing this we make `executor` a lazy/cached property on base job?
That way it is not loaded until it is first accessed, which might fix the problem?
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
```suggestion
@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()
@property
def executor_class(self):
return self.executor.__class__.__name__
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor = executor
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Ah, I didn't realise that. In which case change this to a `@cached_property` too, and then set it in the `if executor:` block.
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Oh right no. Okay.
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Leave executor_class set as a normal attribute (as you had it) -- we just don't need `_executor` but can directly set `self.executor`
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor = executor
self.executor_class = executor.__class__.__name__
else:
self.executor_class = conf.get('core', 'EXECUTOR')
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
Review comment:
```suggestion
@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660268616
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
another option would be to make `executor` a cached property and `executor_class` a property. this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)
```python
@cached_property
def executor(self):
return self._executor or ExecutorLoader.get_default_executor() # store init param `executor` as private attr `_executor`
```
this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660483731
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor = executor
self.executor_class = executor.__class__.__name__
else:
self.executor_class = conf.get('core', 'EXECUTOR')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660483731
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
```suggestion
@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()
@property
def executor_class(self):
return self.executor.__class__.__name__
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870467375
Updated the PR with `cached_propery` approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870467375
Updated the PR with `cached_propery` approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660273193
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870501769
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660201602
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
The other option is to keep it `None` in `BaseJob` and override it in `SchedulerJob` and `BackfillJob`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660493536
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Leave executor_class set as a normal attribute (as you had it) -- we just don't need `_executor` but can directly set `self.executor`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] jedcunningham commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660671207
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,10 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ return ExecutorLoader.get_default_executor()
Review comment:
```suggestion
return ExecutorLoader.get_default_executor()
```
nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660477307
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
>i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
The issue is that it tries to create an instance of `KubernetesExecutor` inside `CeleryExecutor` and `KubernetesExecutor` creates a multiprocessing Manager & Queue in its `__init__` which creates issues with Celery as explained in https://github.com/celery/celery/issues/4525
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660493187
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Oh right no. Okay.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil merged pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #16700:
URL: https://github.com/apache/airflow/pull/16700
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660494022
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
Review comment:
```suggestion
@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660258928
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor
Review comment:
i think this could go behind `if TYPE_CHECKING`
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
another option would be to make `executor` a cached property and `executor_class` a property. this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway.
```python
@cached_property
def executor(self):
return self._executor or ExecutorLoader.get_default_executor() # store init param `executor` as private attr `_executor`
```
this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -110,11 +111,14 @@ def __init__(
processor_poll_interval: float = conf.getfloat('scheduler', 'processor_poll_interval'),
do_pickle: bool = False,
log: logging.Logger = None,
+ executor: Optional[BaseExecutor] = None,
Review comment:
missing from docstring
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor
Review comment:
though.... not sure it matters e.g. if already imported implicitly .... but just in case i figured i'd point it out
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
another option would be to make `executor` a cached property and `executor_class` a property. this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)
```python
@cached_property
def executor(self):
return self._executor or ExecutorLoader.get_default_executor() # store init param `executor` as private attr `_executor`
```
this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660483731
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor = executor
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660201602
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
The other option is to keep it `None` in `BaseJob` and override it in `SchedulerJob` and `BackfillJob`
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
https://github.com/apache/airflow/pull/16700/commits/a7f6efbabc94512fcf6b8ada80f13b7291a0d2c8 vs https://github.com/apache/airflow/pull/16700/commits/376ffd1deff481717758281608b0764f38753956
Thoughts?
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
>i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
The issue is that it tries to create an instance of `KubernetesExecutor` inside `CeleryExecutor` and `KubernetesExecutor` creates a multiprocessing Manager & Queue in its `__init__` which creates issues with Celery as explained in https://github.com/celery/celery/issues/4525
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Well this will cause the same issue, executor_class is stored in DB.
So when the Job model will be saved to DB it access self.executor_class which will execute self.execute and that will instantiate Executor, which we want to avoid
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#issuecomment-870501769
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660492871
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Ah, I didn't realise that. In which case change this to a `@cached_property` too, and then set it in the `if executor:` block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660365084
##########
File path: airflow/jobs/backfill_job.py
##########
@@ -183,7 +186,8 @@ def __init__(
self.conf = conf
self.rerun_failed_tasks = rerun_failed_tasks
self.run_backwards = run_backwards
- super().__init__(*args, **kwargs)
+ self.executor = executor or ExecutorLoader.get_default_executor()
+ super().__init__(executor=self.executor, *args, **kwargs)
Review comment:
How about instead of doing this we make `executor` a lazy/cached property on base job?
That way it is not loaded until it is first accessed, which might fix the problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660491447
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Well this will cause the same issue, executor_class is stored in DB.
So when the Job model will be saved to DB it access self.executor_class which will execute self.execute and that will instantiate Executor, which we want to avoid
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660213566
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
https://github.com/apache/airflow/pull/16700/commits/a7f6efbabc94512fcf6b8ada80f13b7291a0d2c8 vs https://github.com/apache/airflow/pull/16700/commits/376ffd1deff481717758281608b0764f38753956
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] dstandish commented on a change in pull request #16700: Fix ``CeleryKubernetesExecutor``
Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660258928
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor
Review comment:
i think this could go behind `if TYPE_CHECKING`
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
another option would be to make `executor` a cached property and `executor_class` a property. this would simply get them out of init, which i think would be enough if they are not accessed in `LocalTaskJob` anyway.
```python
@cached_property
def executor(self):
return self._executor or ExecutorLoader.get_default_executor() # store init param `executor` as private attr `_executor`
```
this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -110,11 +111,14 @@ def __init__(
processor_poll_interval: float = conf.getfloat('scheduler', 'processor_poll_interval'),
do_pickle: bool = False,
log: logging.Logger = None,
+ executor: Optional[BaseExecutor] = None,
Review comment:
missing from docstring
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor
Review comment:
though.... not sure it matters e.g. if already imported implicitly .... but just in case i figured i'd point it out
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org