You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/10/08 16:38:32 UTC

[airflow] branch master updated: Reduce "start-up" time for tasks in LocalExecutor (#11327)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4839a5b  Reduce "start-up" time for tasks in LocalExecutor (#11327)
4839a5b is described below

commit 4839a5bc6ed7af7d0f836360e4ea3c6fd421e0fa
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Thu Oct 8 17:37:51 2020 +0100

    Reduce "start-up" time for tasks in LocalExecutor (#11327)
    
    Spawning a whole new python process and then re-loading all of Airflow
    is expensive. All though this time fades to insignificance for long
    running tasks, this delay gives a "bad" experience for new users when
    they are just trying out Airflow for the first time.
    
    For the LocalExecutor this cuts the "queued time" down from 1.5s to 0.1s
    on average.
---
 airflow/config_templates/config.yml              | 10 ++++
 airflow/config_templates/default_airflow.cfg     |  5 ++
 airflow/executors/local_executor.py              | 70 ++++++++++++++++++++++--
 airflow/settings.py                              |  8 +++
 airflow/task/task_runner/standard_task_runner.py |  3 +-
 docs/configurations-ref.rst                      |  4 ++
 docs/modules_management.rst                      |  2 +
 docs/plugins.rst                                 | 19 +++++++
 tests/executors/test_local_executor.py           | 50 ++++++++++++++---
 9 files changed, 154 insertions(+), 17 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 8c3330f..f58ea7b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -208,6 +208,16 @@
       type: string
       example: ~
       default: "{AIRFLOW_HOME}/plugins"
+    - name: execute_tasks_new_python_interpreter
+      description: |
+        Should tasks be executed via forking of the parent process ("False",
+        the speedier option) or by spawning a new python process ("True" slow,
+        but means plugin changes picked up by tasks straight away)
+      default: "False"
+      example: ~
+      version_added: "2.0.0"
+      see_also: ":ref:`plugins:loading`"
+      type: boolean
     - name: fernet_key
       description: |
         Secret key to save connection passwords in the db
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index f5adb43..2665749 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -133,6 +133,11 @@ load_default_connections = True
 # Where your Airflow plugins are stored
 plugins_folder = {AIRFLOW_HOME}/plugins
 
+# Should tasks be executed via forking of the parent process ("False",
+# the speedier option) or by spawning a new python process ("True" slow,
+# but means plugin changes picked up by tasks straight away)
+execute_tasks_new_python_interpreter = False
+
 # Secret key to save connection passwords in the db
 fernet_key = {FERNET_KEY}
 
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index d9061f7..c3c8358 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -22,12 +22,17 @@ LocalExecutor
     For more information on how the LocalExecutor works, take a look at the guide:
     :ref:`executor:LocalExecutor`
 """
+import os
 import subprocess
+from abc import abstractmethod
 from multiprocessing import Manager, Process
 from multiprocessing.managers import SyncManager
 from queue import Empty, Queue  # pylint: disable=unused-import  # noqa: F401
 from typing import Any, List, Optional, Tuple, Union  # pylint: disable=unused-import # noqa: F401
 
+from setproctitle import setproctitle  # pylint: disable=no-name-in-module
+
+from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType
 from airflow.models.taskinstance import (  # pylint: disable=unused-import # noqa: F401
@@ -51,10 +56,16 @@ class LocalWorkerBase(Process, LoggingMixin):
     """
 
     def __init__(self, result_queue: 'Queue[TaskInstanceStateType]'):
-        super().__init__()
+        super().__init__(target=self.do_work)
         self.daemon: bool = True
         self.result_queue: 'Queue[TaskInstanceStateType]' = result_queue
 
+    def run(self):
+        # We know we've just started a new process, so lets disconnect from the metadata db now
+        settings.engine.pool.dispose()
+        settings.engine.dispose()
+        return super().run()
+
     def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
         """
         Executes command received and stores result state in queue.
@@ -64,14 +75,61 @@ class LocalWorkerBase(Process, LoggingMixin):
         """
         if key is None:
             return
+
         self.log.info("%s running %s", self.__class__.__name__, command)
+        if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
+            state = self._execute_work_in_subprocess(command)
+        else:
+            state = self._execute_work_in_fork(command)
+
+        self.result_queue.put((key, state))
+
+    def _execute_work_in_subprocess(self, command: CommandType) -> str:
         try:
             subprocess.check_call(command, close_fds=True)
-            state = State.SUCCESS
+            return State.SUCCESS
         except subprocess.CalledProcessError as e:
-            state = State.FAILED
             self.log.error("Failed to execute task %s.", str(e))
-        self.result_queue.put((key, state))
+            return State.FAILED
+
+    def _execute_work_in_fork(self, command: CommandType) -> str:
+        pid = os.fork()
+        if pid:
+            # In parent, wait for the child
+            pid, ret = os.waitpid(pid, 0)
+            return State.SUCCESS if ret == 0 else State.FAILED
+
+        from airflow.sentry import Sentry
+        ret = 1
+        try:
+            import signal
+
+            from airflow.cli.cli_parser import get_parser
+
+            signal.signal(signal.SIGINT, signal.SIG_DFL)
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+            parser = get_parser()
+            # [1:] - remove "airflow" from the start of the command
+            args = parser.parse_args(command[1:])
+
+            setproctitle(f"airflow task supervisor: {command}")
+
+            args.func(args)
+            ret = 0
+            return State.SUCCESS
+        except Exception as e:  # pylint: disable=broad-except
+            self.log.error("Failed to execute task %s.", str(e))
+        finally:
+            Sentry.flush()
+            os._exit(ret)  # pylint: disable=protected-access
+
+    @abstractmethod
+    def do_work(self):
+        """
+        Called in the subprocess and should then execute tasks
+        """
+        raise NotImplementedError()
 
 
 class LocalWorker(LocalWorkerBase):
@@ -91,7 +149,7 @@ class LocalWorker(LocalWorkerBase):
         self.key: TaskInstanceKey = key
         self.command: CommandType = command
 
-    def run(self) -> None:
+    def do_work(self) -> None:
         self.execute_work(key=self.key, command=self.command)
 
 
@@ -111,7 +169,7 @@ class QueuedLocalWorker(LocalWorkerBase):
         super().__init__(result_queue=result_queue)
         self.task_queue = task_queue
 
-    def run(self) -> None:
+    def do_work(self) -> None:
         while True:
             key, command = self.task_queue.get()
             try:
diff --git a/airflow/settings.py b/airflow/settings.py
index 1e5ca06..0fe1b93 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -365,3 +365,11 @@ STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIAL
 # to get all the logs from the print & log statements in the DAG files before a task is run
 # The handlers are restored after the task completes execution.
 DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)
+
+CAN_FORK = hasattr(os, "fork")
+
+EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
+    'core',
+    'execute_tasks_new_python_interpreter',
+    fallback=False,
+)
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 32a0c84..daeb267 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -21,11 +21,10 @@ import os
 import psutil
 from setproctitle import setproctitle  # pylint: disable=no-name-in-module
 
+from airflow.settings import CAN_FORK
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
 from airflow.utils.process_utils import reap_process_group
 
-CAN_FORK = hasattr(os, "fork")
-
 
 class StandardTaskRunner(BaseTaskRunner):
     """
diff --git a/docs/configurations-ref.rst b/docs/configurations-ref.rst
index e5b2cda..1fba461 100644
--- a/docs/configurations-ref.rst
+++ b/docs/configurations-ref.rst
@@ -53,6 +53,10 @@ can set in ``airflow.cfg`` file or using environment variables.
     {{ option["description"] }}
     {% endif %}
 
+    {% if option.get("see_also") %}
+    .. seealso:: {{ option["see_also"] }}
+    {% endif %}
+
     :Type: {{ option["type"] }}
     :Default: ``{{ "''" if option["default"] == "" else option["default"] }}``
     :Environment Variable: ``AIRFLOW__{{ section["name"] | upper }}__{{ option["name"] | upper }}``
diff --git a/docs/modules_management.rst b/docs/modules_management.rst
index 3130e51..3632c74 100644
--- a/docs/modules_management.rst
+++ b/docs/modules_management.rst
@@ -228,6 +228,8 @@ Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/pyt
 
 Below is the sample output of the ``airflow info`` command:
 
+.. seealso:: :ref:`plugins:loading`
+
 .. code-block:: none
 
     Apache Airflow [1.10.11]
diff --git a/docs/plugins.rst b/docs/plugins.rst
index cf27804..bc33501 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -66,6 +66,25 @@ Airflow has many components that can be reused when building an application:
 * Airflow is deployed, you can just piggy back on its deployment logistics
 * Basic charting capabilities, underlying libraries and abstractions
 
+.. _plugins:loading:
+
+When are plugins (re)loaded?
+----------------------------
+
+Plugins are loaded once at the start of every Airflow process, and never reloaded.
+
+This means that if you make any changes to plugins and you want the webserver or scheduler to use that new
+code you will need to restart those processes.
+
+By default, task execution will use forking to avoid the slow down of having to create a whole new python
+interpreter and re-parse all of the Airflow code and start up routines -- this is a big benefit for shorter
+running tasks. This does mean that if you use plugins in your tasks, and want them to update you will either
+need to restart the worker (if using CeleryExecutor) or scheduler (Local or Sequential executors). The other
+option is you can accept the speed hit at start up set the ``core.execute_tasks_new_python_interpreter``
+config setting to True, resulting in launching a whole new python interpreter for tasks.
+
+(Modules only imported by DAG files on the other hand do not suffer this problem, as DAG files are not
+loaded/parsed in any long-running Airflow process.)
 
 Interface
 ---------
diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py
index 7fc909c..20722bf 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -20,6 +20,8 @@ import subprocess
 import unittest
 from unittest import mock
 
+from airflow import settings
+from airflow.exceptions import AirflowException
 from airflow.executors.local_executor import LocalExecutor
 from airflow.utils.state import State
 
@@ -29,9 +31,9 @@ class TestLocalExecutor(unittest.TestCase):
     TEST_SUCCESS_COMMANDS = 5
 
     @mock.patch('airflow.executors.local_executor.subprocess.check_call')
-    def execution_parallelism(self, mock_check_call, parallelism=0):
-        success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter']
-        fail_command = ['airflow', 'tasks', 'run', 'false']
+    def execution_parallelism_subprocess(self, mock_check_call, parallelism=0):
+        success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter', '2020-10-07']
+        fail_command = ['airflow', 'tasks', 'run', 'false', 'task_id', '2020-10-07']
 
         def fake_execute_command(command, close_fds=True):  # pylint: disable=unused-argument
             if command != success_command:
@@ -41,6 +43,23 @@ class TestLocalExecutor(unittest.TestCase):
 
         mock_check_call.side_effect = fake_execute_command
 
+        self._test_execute(parallelism, success_command, fail_command)
+
+    @mock.patch('airflow.cli.commands.task_command.task_run')
+    def execution_parallelism_fork(self, mock_run, parallelism=0):
+        success_command = ['airflow', 'tasks', 'run', 'success', 'some_parameter', '2020-10-07']
+        fail_command = ['airflow', 'tasks', 'run', 'failure', 'some_parameter', '2020-10-07']
+
+        def fake_task_run(args):
+            if args.dag_id != 'success':
+                raise AirflowException('Simulate failed task')
+
+        mock_run.side_effect = fake_task_run
+
+        self._test_execute(parallelism, success_command, fail_command)
+
+    def _test_execute(self, parallelism, success_command, fail_command):
+
         executor = LocalExecutor(parallelism=parallelism)
         executor.start()
 
@@ -71,12 +90,25 @@ class TestLocalExecutor(unittest.TestCase):
         expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism
         self.assertEqual(executor.workers_used, expected)
 
-    def test_execution_unlimited_parallelism(self):
-        self.execution_parallelism(parallelism=0)  # pylint: disable=no-value-for-parameter
-
-    def test_execution_limited_parallelism(self):
-        test_parallelism = 2
-        self.execution_parallelism(parallelism=test_parallelism)  # pylint: disable=no-value-for-parameter
+    def test_execution_subprocess_unlimited_parallelism(self):
+        with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER',
+                               new_callable=mock.PropertyMock) as option:
+            option.return_value = True
+            self.execution_parallelism_subprocess(parallelism=0)  # pylint: disable=no-value-for-parameter
+
+    def test_execution_subprocess_limited_parallelism(self):
+        with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER',
+                               new_callable=mock.PropertyMock) as option:
+            option.return_value = True
+            self.execution_parallelism_subprocess(parallelism=2)  # pylint: disable=no-value-for-parameter
+
+    @mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False)
+    def test_execution_unlimited_parallelism_fork(self):
+        self.execution_parallelism_fork(parallelism=0)  # pylint: disable=no-value-for-parameter
+
+    @mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False)
+    def test_execution_limited_parallelism_fork(self):
+        self.execution_parallelism_fork(parallelism=2)  # pylint: disable=no-value-for-parameter
 
     @mock.patch('airflow.executors.local_executor.LocalExecutor.sync')
     @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')