You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/07/09 19:57:40 UTC

[airflow] 01/07: Fix ``CeleryKubernetesExecutor`` (#16700)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e264ef1eb226fbe1424d240839989e0e4dae041a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Jun 29 23:39:34 2021 +0100

    Fix ``CeleryKubernetesExecutor`` (#16700)
    
    closes https://github.com/apache/airflow/issues/16326
    
    Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
    we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
    instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
    which fails.
    
    ```
    [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
        args.func(args)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
        return func(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
        return f(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
        _run_task_by_selected_method(args, dag, ti)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
        _run_task_by_local_task_job(args, ti)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
        pool=args.pool,
      File "<string>", line 4, in __init__
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
        manager.dispatch.init_failure(self, args, kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
        with_traceback=exc_tb,
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
        raise exception
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
        return manager.original_init(*mixed[1:], **kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
        super().__init__(*args, **kwargs)
      File "<string>", line 6, in __init__
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
        self.executor = executor or ExecutorLoader.get_default_executor()
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
        cls._default_executor = cls.load_executor(executor_name)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
        return cls.__load_celery_kubernetes_executor()
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
        kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
        self._manager = multiprocessing.Manager()
      File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
        m.start()
      File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
        self._process.start()
      File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
        'daemonic processes are not allowed to have children'
    AssertionError: daemonic processes are not allowed to have children
    ```
    
    We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
    
    (cherry picked from commit 7857a9bde2e189881f87fe4dc0cdce7503895c03)
---
 airflow/jobs/base_job.py    | 12 ++++++++++--
 tests/jobs/test_base_job.py |  7 ++++++-
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 1837c27..4edb692 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -25,6 +25,7 @@ from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import backref, foreign, relationship
 from sqlalchemy.orm.session import make_transient
 
+from airflow.compat.functools import cached_property
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.executors.executor_loader import ExecutorLoader
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        if executor:
+            self.executor = executor
+            self.executor_class = executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')
         self.start_date = timezone.utcnow()
         self.latest_heartbeat = timezone.utcnow()
         if heartrate is not None:
@@ -104,6 +108,10 @@ class BaseJob(Base, LoggingMixin):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        return ExecutorLoader.get_default_executor()
+
     @classmethod
     @provide_session
     def most_recent_job(cls, session=None) -> Optional['BaseJob']:
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 093386b..93f2630 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -118,7 +118,12 @@ class TestBaseJob:
 
             assert job.latest_heartbeat == when, "attribute not updated when heartbeat fails"
 
-    @conf_vars({('scheduler', 'max_tis_per_query'): '100'})
+    @conf_vars(
+        {
+            ('scheduler', 'max_tis_per_query'): '100',
+            ('core', 'executor'): 'SequentialExecutor',
+        }
+    )
     @patch('airflow.jobs.base_job.ExecutorLoader.get_default_executor')
     @patch('airflow.jobs.base_job.get_hostname')
     @patch('airflow.jobs.base_job.getuser')