You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/10/08 16:38:32 UTC
[airflow] branch master updated: Reduce "start-up" time for tasks
in LocalExecutor (#11327)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 4839a5b Reduce "start-up" time for tasks in LocalExecutor (#11327)
4839a5b is described below
commit 4839a5bc6ed7af7d0f836360e4ea3c6fd421e0fa
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Thu Oct 8 17:37:51 2020 +0100
Reduce "start-up" time for tasks in LocalExecutor (#11327)
Spawning a whole new python process and then re-loading all of Airflow
is expensive. All though this time fades to insignificance for long
running tasks, this delay gives a "bad" experience for new users when
they are just trying out Airflow for the first time.
For the LocalExecutor this cuts the "queued time" down from 1.5s to 0.1s
on average.
---
airflow/config_templates/config.yml | 10 ++++
airflow/config_templates/default_airflow.cfg | 5 ++
airflow/executors/local_executor.py | 70 ++++++++++++++++++++++--
airflow/settings.py | 8 +++
airflow/task/task_runner/standard_task_runner.py | 3 +-
docs/configurations-ref.rst | 4 ++
docs/modules_management.rst | 2 +
docs/plugins.rst | 19 +++++++
tests/executors/test_local_executor.py | 50 ++++++++++++++---
9 files changed, 154 insertions(+), 17 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 8c3330f..f58ea7b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -208,6 +208,16 @@
type: string
example: ~
default: "{AIRFLOW_HOME}/plugins"
+ - name: execute_tasks_new_python_interpreter
+ description: |
+ Should tasks be executed via forking of the parent process ("False",
+ the speedier option) or by spawning a new python process ("True" slow,
+ but means plugin changes picked up by tasks straight away)
+ default: "False"
+ example: ~
+ version_added: "2.0.0"
+ see_also: ":ref:`plugins:loading`"
+ type: boolean
- name: fernet_key
description: |
Secret key to save connection passwords in the db
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index f5adb43..2665749 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -133,6 +133,11 @@ load_default_connections = True
# Where your Airflow plugins are stored
plugins_folder = {AIRFLOW_HOME}/plugins
+# Should tasks be executed via forking of the parent process ("False",
+# the speedier option) or by spawning a new python process ("True" slow,
+# but means plugin changes picked up by tasks straight away)
+execute_tasks_new_python_interpreter = False
+
# Secret key to save connection passwords in the db
fernet_key = {FERNET_KEY}
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index d9061f7..c3c8358 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -22,12 +22,17 @@ LocalExecutor
For more information on how the LocalExecutor works, take a look at the guide:
:ref:`executor:LocalExecutor`
"""
+import os
import subprocess
+from abc import abstractmethod
from multiprocessing import Manager, Process
from multiprocessing.managers import SyncManager
from queue import Empty, Queue # pylint: disable=unused-import # noqa: F401
from typing import Any, List, Optional, Tuple, Union # pylint: disable=unused-import # noqa: F401
+from setproctitle import setproctitle # pylint: disable=no-name-in-module
+
+from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType
from airflow.models.taskinstance import ( # pylint: disable=unused-import # noqa: F401
@@ -51,10 +56,16 @@ class LocalWorkerBase(Process, LoggingMixin):
"""
def __init__(self, result_queue: 'Queue[TaskInstanceStateType]'):
- super().__init__()
+ super().__init__(target=self.do_work)
self.daemon: bool = True
self.result_queue: 'Queue[TaskInstanceStateType]' = result_queue
+ def run(self):
+ # We know we've just started a new process, so lets disconnect from the metadata db now
+ settings.engine.pool.dispose()
+ settings.engine.dispose()
+ return super().run()
+
def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
"""
Executes command received and stores result state in queue.
@@ -64,14 +75,61 @@ class LocalWorkerBase(Process, LoggingMixin):
"""
if key is None:
return
+
self.log.info("%s running %s", self.__class__.__name__, command)
+ if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
+ state = self._execute_work_in_subprocess(command)
+ else:
+ state = self._execute_work_in_fork(command)
+
+ self.result_queue.put((key, state))
+
+ def _execute_work_in_subprocess(self, command: CommandType) -> str:
try:
subprocess.check_call(command, close_fds=True)
- state = State.SUCCESS
+ return State.SUCCESS
except subprocess.CalledProcessError as e:
- state = State.FAILED
self.log.error("Failed to execute task %s.", str(e))
- self.result_queue.put((key, state))
+ return State.FAILED
+
+ def _execute_work_in_fork(self, command: CommandType) -> str:
+ pid = os.fork()
+ if pid:
+ # In parent, wait for the child
+ pid, ret = os.waitpid(pid, 0)
+ return State.SUCCESS if ret == 0 else State.FAILED
+
+ from airflow.sentry import Sentry
+ ret = 1
+ try:
+ import signal
+
+ from airflow.cli.cli_parser import get_parser
+
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+ parser = get_parser()
+ # [1:] - remove "airflow" from the start of the command
+ args = parser.parse_args(command[1:])
+
+ setproctitle(f"airflow task supervisor: {command}")
+
+ args.func(args)
+ ret = 0
+ return State.SUCCESS
+ except Exception as e: # pylint: disable=broad-except
+ self.log.error("Failed to execute task %s.", str(e))
+ finally:
+ Sentry.flush()
+ os._exit(ret) # pylint: disable=protected-access
+
+ @abstractmethod
+ def do_work(self):
+ """
+ Called in the subprocess and should then execute tasks
+ """
+ raise NotImplementedError()
class LocalWorker(LocalWorkerBase):
@@ -91,7 +149,7 @@ class LocalWorker(LocalWorkerBase):
self.key: TaskInstanceKey = key
self.command: CommandType = command
- def run(self) -> None:
+ def do_work(self) -> None:
self.execute_work(key=self.key, command=self.command)
@@ -111,7 +169,7 @@ class QueuedLocalWorker(LocalWorkerBase):
super().__init__(result_queue=result_queue)
self.task_queue = task_queue
- def run(self) -> None:
+ def do_work(self) -> None:
while True:
key, command = self.task_queue.get()
try:
diff --git a/airflow/settings.py b/airflow/settings.py
index 1e5ca06..0fe1b93 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -365,3 +365,11 @@ STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIAL
# to get all the logs from the print & log statements in the DAG files before a task is run
# The handlers are restored after the task completes execution.
DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)
+
+CAN_FORK = hasattr(os, "fork")
+
+EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
+ 'core',
+ 'execute_tasks_new_python_interpreter',
+ fallback=False,
+)
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 32a0c84..daeb267 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -21,11 +21,10 @@ import os
import psutil
from setproctitle import setproctitle # pylint: disable=no-name-in-module
+from airflow.settings import CAN_FORK
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.process_utils import reap_process_group
-CAN_FORK = hasattr(os, "fork")
-
class StandardTaskRunner(BaseTaskRunner):
"""
diff --git a/docs/configurations-ref.rst b/docs/configurations-ref.rst
index e5b2cda..1fba461 100644
--- a/docs/configurations-ref.rst
+++ b/docs/configurations-ref.rst
@@ -53,6 +53,10 @@ can set in ``airflow.cfg`` file or using environment variables.
{{ option["description"] }}
{% endif %}
+ {% if option.get("see_also") %}
+ .. seealso:: {{ option["see_also"] }}
+ {% endif %}
+
:Type: {{ option["type"] }}
:Default: ``{{ "''" if option["default"] == "" else option["default"] }}``
:Environment Variable: ``AIRFLOW__{{ section["name"] | upper }}__{{ option["name"] | upper }}``
diff --git a/docs/modules_management.rst b/docs/modules_management.rst
index 3130e51..3632c74 100644
--- a/docs/modules_management.rst
+++ b/docs/modules_management.rst
@@ -228,6 +228,8 @@ Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/pyt
Below is the sample output of the ``airflow info`` command:
+.. seealso:: :ref:`plugins:loading`
+
.. code-block:: none
Apache Airflow [1.10.11]
diff --git a/docs/plugins.rst b/docs/plugins.rst
index cf27804..bc33501 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -66,6 +66,25 @@ Airflow has many components that can be reused when building an application:
* Airflow is deployed, you can just piggy back on its deployment logistics
* Basic charting capabilities, underlying libraries and abstractions
+.. _plugins:loading:
+
+When are plugins (re)loaded?
+----------------------------
+
+Plugins are loaded once at the start of every Airflow process, and never reloaded.
+
+This means that if you make any changes to plugins and you want the webserver or scheduler to use that new
+code you will need to restart those processes.
+
+By default, task execution will use forking to avoid the slow down of having to create a whole new python
+interpreter and re-parse all of the Airflow code and start up routines -- this is a big benefit for shorter
+running tasks. This does mean that if you use plugins in your tasks, and want them to update you will either
+need to restart the worker (if using CeleryExecutor) or scheduler (Local or Sequential executors). The other
+option is you can accept the speed hit at start up set the ``core.execute_tasks_new_python_interpreter``
+config setting to True, resulting in launching a whole new python interpreter for tasks.
+
+(Modules only imported by DAG files on the other hand do not suffer this problem, as DAG files are not
+loaded/parsed in any long-running Airflow process.)
Interface
---------
diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py
index 7fc909c..20722bf 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -20,6 +20,8 @@ import subprocess
import unittest
from unittest import mock
+from airflow import settings
+from airflow.exceptions import AirflowException
from airflow.executors.local_executor import LocalExecutor
from airflow.utils.state import State
@@ -29,9 +31,9 @@ class TestLocalExecutor(unittest.TestCase):
TEST_SUCCESS_COMMANDS = 5
@mock.patch('airflow.executors.local_executor.subprocess.check_call')
- def execution_parallelism(self, mock_check_call, parallelism=0):
- success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter']
- fail_command = ['airflow', 'tasks', 'run', 'false']
+ def execution_parallelism_subprocess(self, mock_check_call, parallelism=0):
+ success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter', '2020-10-07']
+ fail_command = ['airflow', 'tasks', 'run', 'false', 'task_id', '2020-10-07']
def fake_execute_command(command, close_fds=True): # pylint: disable=unused-argument
if command != success_command:
@@ -41,6 +43,23 @@ class TestLocalExecutor(unittest.TestCase):
mock_check_call.side_effect = fake_execute_command
+ self._test_execute(parallelism, success_command, fail_command)
+
+ @mock.patch('airflow.cli.commands.task_command.task_run')
+ def execution_parallelism_fork(self, mock_run, parallelism=0):
+ success_command = ['airflow', 'tasks', 'run', 'success', 'some_parameter', '2020-10-07']
+ fail_command = ['airflow', 'tasks', 'run', 'failure', 'some_parameter', '2020-10-07']
+
+ def fake_task_run(args):
+ if args.dag_id != 'success':
+ raise AirflowException('Simulate failed task')
+
+ mock_run.side_effect = fake_task_run
+
+ self._test_execute(parallelism, success_command, fail_command)
+
+ def _test_execute(self, parallelism, success_command, fail_command):
+
executor = LocalExecutor(parallelism=parallelism)
executor.start()
@@ -71,12 +90,25 @@ class TestLocalExecutor(unittest.TestCase):
expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism
self.assertEqual(executor.workers_used, expected)
- def test_execution_unlimited_parallelism(self):
- self.execution_parallelism(parallelism=0) # pylint: disable=no-value-for-parameter
-
- def test_execution_limited_parallelism(self):
- test_parallelism = 2
- self.execution_parallelism(parallelism=test_parallelism) # pylint: disable=no-value-for-parameter
+ def test_execution_subprocess_unlimited_parallelism(self):
+ with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER',
+ new_callable=mock.PropertyMock) as option:
+ option.return_value = True
+ self.execution_parallelism_subprocess(parallelism=0) # pylint: disable=no-value-for-parameter
+
+ def test_execution_subprocess_limited_parallelism(self):
+ with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER',
+ new_callable=mock.PropertyMock) as option:
+ option.return_value = True
+ self.execution_parallelism_subprocess(parallelism=2) # pylint: disable=no-value-for-parameter
+
+ @mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False)
+ def test_execution_unlimited_parallelism_fork(self):
+ self.execution_parallelism_fork(parallelism=0) # pylint: disable=no-value-for-parameter
+
+ @mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False)
+ def test_execution_limited_parallelism_fork(self):
+ self.execution_parallelism_fork(parallelism=2) # pylint: disable=no-value-for-parameter
@mock.patch('airflow.executors.local_executor.LocalExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')