You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/19 09:50:02 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #21351: Add and use supports_celery attribute for cli celery command validation

potiuk commented on a change in pull request #21351:
URL: https://github.com/apache/airflow/pull/21351#discussion_r810471560



##########
File path: tests/cli/test_cli_parser.py
##########
@@ -206,8 +206,9 @@ def test_dag_parser_celery_command_require_celery_executor(self):
             stderr = stderr.getvalue()
         assert (
             "airflow command error: argument GROUP_OR_COMMAND: celery subcommand "
-            "works only with CeleryExecutor, CeleryKubernetesExecutor and executors derived from them, "
-            "your current executor: SequentialExecutor, subclassed from: BaseExecutor, see help above."
+            "works only with executors that has 'supports_celery' set to True, "

Review comment:
       ```suggestion
               "works only with executors that have 'supports_celery' set to True, "
   ```

##########
File path: airflow/cli/cli_parser.py
##########
@@ -59,31 +59,23 @@ class DefaultHelpParser(argparse.ArgumentParser):
     def _check_value(self, action, value):
         """Override _check_value and check conditionally added command"""
         if action.dest == 'subcommand' and value == 'celery':
+            try:
+                from airflow.executors.celery_executor import CeleryExecutor  # noqa
+            except ImportError:
+                message = (
+                    "The celery subcommand requires that you pip install the celery module. "
+                    "To do it, run: pip install 'apache-airflow[celery]'"
+                )
+                raise ArgumentError(action, message)
+
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
                 executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
-                classes = ()
-                try:
-                    from airflow.executors.celery_executor import CeleryExecutor
-
-                    classes += (CeleryExecutor,)
-                except ImportError:
-                    message = (
-                        "The celery subcommand requires that you pip install the celery module. "
-                        "To do it, run: pip install 'apache-airflow[celery]'"
-                    )
-                    raise ArgumentError(action, message)
-                try:
-                    from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
-
-                    classes += (CeleryKubernetesExecutor,)
-                except ImportError:
-                    pass
-                if not issubclass(executor_cls, classes):
+                if not getattr(executor_cls, "supports_celery", False):
                     message = (
-                        f'celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and '
-                        f'executors derived from them, your current executor: {executor}, subclassed from: '
-                        f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}'
+                        f"celery subcommand works only with executors that has 'supports_celery' "

Review comment:
       ```suggestion
                           f"celery subcommand works only with executors that have 'supports_celery' "
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org