You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/05 02:48:09 UTC

[GitHub] [airflow] georborodin opened a new pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

georborodin opened a new pull request #17426:
URL: https://github.com/apache/airflow/pull/17426


   Currently Airflow CLI does not allow using Celery commands for custom executors subclassed from CeleryExecutor or CeleryKubernetesExecutor. With this PR, additional check is run to determine whether custom executor is subclassed from pre-existing Celery executors, which means that it supports CLI Celery commands.  
   
   Fixes: #17373 
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684113177



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       @mik-laj is right. And loading executor might not be a good idea here actually, because there might be some side effects of instantiating the executor twice. But I think there is an easier way. Simply calling `ExecutorLoader.get_default_executor()` should return the executor object properly (as singleton as well - so it won't be initialized twice).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893219756


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683184327



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           f'celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor, and the subclasses, your current executor: {executor}'
   ```
   Should we add this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683182521



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with executors derived from CeleryExecutor'
                           f'your current executor class hierarchy: {type(executor).mro()}'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683844854



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       I am not sure if this will allow the executor to load in all cases, and in particular if it works with executed provided by plugins.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893309025


   @potiuk @ephraimbuddy I tried to merge your suggestions into one, please see 1921b98


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893120800


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683185788



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       NIT. but should be more helpful in case of more complex hierarchies 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684090726



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       Should I maybe change that part to this?
   ```suggestion
                   executor_cls = ExecutorLoader.load_executor(ExecutorLoader.executors.get(executor, executor))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin edited a comment on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin edited a comment on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893309025


   @potiuk @ephraimbuddy I tried to merge your suggestions into one, please see ~~1921b98~~ 54ce5ff (used plain string in first commit, would've returned mro for str type, not for executor class)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893120800


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683182521



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with '
                           'executors derived from CeleryExecutor, your current executor: {executor}'
   ```

##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with '
                           f'executors derived from CeleryExecutor, your current executor: {executor}'
   ```

##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with executors derived from CeleryExecutor'
                           f'your current executor class hierarchy: {type(executor).mro()}'
   ```

##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       NIT. but should be more helpful in case of more complex hierarchies 

##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       Great minds think alike ;) 

##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with executors derived from CeleryExecutor. '
                           f'Your current executor class hierarchy: {type(executor).mro()}'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684139638



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       @potiuk here's why I went with the `import_string` approach at first: `CeleryKubernetesExecutor`-based executors require celery_executor and kubernetes_executor to be instantiated and passed to it at `__init__(self, celery_executor, kubernetes_executor)`, so the Celery-related command would still require Kubernetes dependencies installed. `ExecuterLoader.get_default_executor()` would still instantiate an executor object.
   
   May I suggest splitting up `ExecutorLoader.load_executor` method so that there would be a separate method for getting executor class by string? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684139638



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       @potiuk here's why I went with the `import_string` approach at first: `CeleryKubernetesExecutor`-based executors require celery_execytor and kubernetes_executor to be instantiated and passed to it at `__init__(self, celery_executor, kubernetes_executor)`, so the Celery-related command would still require Kubernetes dependencies installed. `ExecuterLoader.get_default_executor()` would still instantiate an executor object.
   
   May I suggest splitting up methods in `ExecutorLoader` so that there would be a method for getting executor class by string name?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683182521



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with '
                           'executors derived from CeleryExecutor, your current executor: {executor}'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684139638



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       @potiuk here's why I went with the `import_string` approach at first: `CeleryKubernetesExecutor`-based executors require celery_executor and kubernetes_executor to be instantiated and passed to it at `__init__(self, celery_executor, kubernetes_executor)`, so the Celery-related command would still require Kubernetes dependencies installed. `ExecuterLoader.get_default_executor()` would still instantiate an executor object.
   
   May I suggest splitting up `ExecutorLoader.load_executor` method in so that there would be a separate method for getting executor class by string? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin closed pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin closed pull request #17426:
URL: https://github.com/apache/airflow/pull/17426


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683844854



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       I am not sure if this will allow the executor to load in all cases, and in particular if it works with executed provided by plugins. `ExecutorLoader.load_executor` supports one more syntax:
   https://github.com/apache/airflow/blob/8505d2f0a4524313e3eff7a4f16b9a9439c7a79f/airflow/executors/executor_loader.py#L73
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683186353



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       Great minds think alike ;) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684090726



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       ~~Should I maybe change that part to this?~~ (Invalid suggestion)
   ```suggestion
                   executor_cls = ExecutorLoader.load_executor(ExecutorLoader.executors.get(executor, executor))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893309025


   @potiuk @ephraimbuddy I tried to merge your suggestions into one, please see 1921b98


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683184327



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           f'celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor, and the subclasses, your current executor: {executor}'
   ```
   Should we add this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683182521



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with executors derived from CeleryExecutor. '
                           f'Your current executor class hierarchy: {type(executor).mro()}'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684139638



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       @potiuk that's why I went with the import_string approach at first: `CeleryKubernetesExecutor`-based executors require celery_execytor and kubernetes_executor to be instantiated and passed to it at `__init__(self, celery_executor, kubernetes_executor)`, so the Celery-related command would still require Kubernetes dependencies installed. `ExecuterLoader.get_default_executor()` would still instantiate an executor object.
   
   May I suggest splitting up methods in `ExecutorLoader` so that there would be a method for getting executor class by string name?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r683182521



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,15 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))
+                if not issubclass(
+                    executor_cls,
+                    (celery_executor.CeleryExecutor, celery_kubernetes_executor.CeleryKubernetesExecutor),
+                ):
+                    message = (
+                        f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'

Review comment:
       ```suggestion
                           'celery subcommand works only with '
                           f'executors derived from CeleryExecutor, your current executor: {executor}'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684139638



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       @potiuk here's why I went with the `import_string` approach at first: `CeleryKubernetesExecutor`-based executors require celery_execytor and kubernetes_executor to be instantiated and passed to it at `__init__(self, celery_executor, kubernetes_executor)`, so the Celery-related command would still require Kubernetes dependencies installed. `ExecuterLoader.get_default_executor()` would still instantiate an executor object.
   
   May I suggest splitting up `ExecutorLoader.load_executor` method in so that there would be a separate method for getting executor class by string? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin commented on a change in pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin commented on a change in pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#discussion_r684090726



##########
File path: airflow/cli/cli_parser.py
##########
@@ -60,10 +62,17 @@ def _check_value(self, action, value):
         if action.dest == 'subcommand' and value == 'celery':
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                message = (
-                    f'celery subcommand works only with CeleryExecutor, your current executor: {executor}'
-                )
-                raise ArgumentError(action, message)
+                executor_cls = import_string(ExecutorLoader.executors.get(executor, executor))

Review comment:
       Should I maybe change that part to:
   ```suggestion
                   executor_cls = ExecutorLoader.load_executor(ExecutorLoader.executors.get(executor, executor))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893219756


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] georborodin edited a comment on pull request #17426: Allow using default celery commands for custom Celery executors subclassed from existing

Posted by GitBox <gi...@apache.org>.
georborodin edited a comment on pull request #17426:
URL: https://github.com/apache/airflow/pull/17426#issuecomment-893309025


   @potiuk @ephraimbuddy I tried to merge your suggestions into one, please see ~~1921b98~~ 54ce5ff (used plain string in first commit, would've returned mro for str type, not for executor class)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org