You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/09/06 09:55:11 UTC

[airflow] branch v2-1-test updated (d14d0b0 -> 2690c79)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard d14d0b0  Improve the description of how to handle dynamic task generation (#17963)
     new 2690c79  Improve the description of how to handle dynamic task generation (#17963)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d14d0b0)
            \
             N -- N -- N   refs/heads/v2-1-test (2690c79)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

[airflow] 01/01: Improve the description of how to handle dynamic task generation (#17963)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2690c79a5d141e869437e51ea98249812837a963
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Sep 6 10:40:17 2021 +0200

    Improve the description of how to handle dynamic task generation (#17963)
    
    The Top-Level best practices were a little misleading. They
    suggested that no code should be written at the top-level DAG other
    than just creating operators, but the story is a little more nuanced.
    
    Better explanation is give and also examples on how you can deal
    with the situation when you need to generate your data based on
    some meta-data. From Slack discussion it seems that it is not
    obvious at all what are the best ways to handle that so two
    alternatives were presented with generating a meta-data file
    and generating an importable python code containing the meta-data.
    
    During that change, I noticed also, that config sections and
    config variables were not sorted - which made it very difficult to
    search for them in the index. All the config variables are now
    sorted so the references to the righ sections/variables make much
    more sense now.
    
    (cherry picked from commit 1be3ef635fab635f741b775c52e0da7fe0871567)
---
 airflow/configuration.py               |   4 +-
 docs/apache-airflow/best-practices.rst | 150 ++++++++++++++++++++++++++++-----
 docs/conf.py                           |   6 ++
 3 files changed, 139 insertions(+), 21 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 452f127..16151dc 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -32,7 +32,7 @@ from collections import OrderedDict
 # Ignored Mypy on configparser because it thinks the configparser module has no _UNSET attribute
 from configparser import _UNSET, ConfigParser, NoOptionError, NoSectionError  # type: ignore
 from json.decoder import JSONDecodeError
-from typing import Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Union
 
 from airflow.exceptions import AirflowConfigException
 from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
@@ -92,7 +92,7 @@ def _default_config_file_path(file_name: str):
     return os.path.join(templates_dir, file_name)
 
 
-def default_config_yaml() -> List[dict]:
+def default_config_yaml() -> List[Dict[str, Any]]:
     """
     Read Airflow configs from YAML file
 
diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst
index 7d06192..6b88776 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -55,7 +55,7 @@ Some of the ways you can avoid producing a different result -
   Someone may update the input data between re-runs, which results in different outputs.
   A better way is to read the input data from a specific partition. You can use ``execution_date`` as a partition.
   You should follow this partitioning method while writing data in S3/HDFS, as well.
-* The python datetime ``now()`` function gives the current datetime object.
+* The Python datetime ``now()`` function gives the current datetime object.
   This function should never be used inside a task, especially to do the critical computation, as it leads to different outcomes on each run.
   It's fine to use it, for example, to generate a temporary log.
 
@@ -88,20 +88,121 @@ and the downstream tasks can pull the path from XCom and use it to read the data
 The tasks should also not store any authentication parameters such as passwords or token inside them.
 Where at all possible, use :doc:`Connections </concepts/connections>` to store data securely in Airflow backend and retrieve them using a unique connection id.
 
+Top level Python Code and Dynamic DAGs
+--------------------------------------
 
-Variables
----------
+You should avoid writing the top level code which is not necessary to create Operators
+and build DAG relations between them. This is because of the design decision for the scheduler of Airflow
+and the impact the top-level code parsing speed on both performance and scalability of Airflow.
 
-You should avoid usage of Variables outside an operator's ``execute()`` method or Jinja templates if possible,
-as Variables create a connection to metadata DB of Airflow to fetch the value, which can slow down parsing and
-place extra load on the DB.
+Airflow scheduler executes the code outside the Operator's ``execute`` methods with the minimum interval of
+:ref:`min_file_process_interval<config:scheduler__min_file_process_interval>` seconds. This is done in order
+to allow dynamic scheduling of the DAGs - where scheduling and dependencies might change over time and
+impact the next schedule of the DAG. Airflow scheduler tries to continuously make sure that what you have
+in DAGs is correctly reflected in scheduled tasks.
 
-Airflow parses all the DAGs in the background at a specific period.
-The default period is set using the ``processor_poll_interval`` config, which is 1 second by default.
-During parsing, Airflow creates a new connection to the metadata DB for each DAG.
-This can result in a lot of open connections.
+Specifically you should not run any database access, heavy computations and networking operations.
+
+This limitation is especially important in case of dynamic DAG configuration, which can be configured
+essentially in one of those ways:
+
+* via `environment variables <https://wiki.archlinux.org/title/environment_variables>`_ (not to be mistaken
+  with the :doc:`Airflow Variables </concepts/variables>`)
+* via externally provided, generated Python code, containing meta-data in the DAG folder
+* via externally provided, generated configuration meta-data file in the DAG folder
+
+All cases are described in the following chapters.
+
+Dynamic DAGs with environment variables
+.......................................
+
+If you want to use variables to configure your code, you should always use
+`environment variables <https://wiki.archlinux.org/title/environment_variables>`_ in your
+top-level code rather than :doc:`Airflow Variables </concepts/variables>`. Using Airflow Variables
+at top-level code creates a connection to metadata DB of Airflow to fetch the value, which can slow
+down parsing and place extra load on the DB. See the `Airflow Variables <_best_practices/airflow_variables>`_
+on how to make best use of Airflow Variables in your DAGs using Jinja templates .
+
+For example you could set ``DEPLOYMENT`` variable differently for your production and development
+environments. The variable ``DEPLOYMENT`` could be set to ``PROD`` in your production environment and to
+``DEV`` in your development environment. Then you could build your dag differently in production and
+development environment, depending on the value of the environment variable.
+
+.. code-block:: python
+
+    deployment = os.environ.get("DEPLOYMENT", "PROD")
+    if deployment == "PROD":
+        task = Operator(param="prod-param")
+    elif deployment == "DEV":
+        task = Operator(param="dev-param")
+
+
+Generating Python code with embedded meta-data
+..............................................
+
+You can externally generate Python code containing the meta-data as importable constants.
+Such constant can then be imported directly by your DAG and used to construct the object and build
+the dependencies. This makes it easy to import such code from multiple DAGs without the need to find,
+load and parse the meta-data stored in the constant - this is done automatically by Python interpreter
+when it processes the "import" statement. This sounds strange at first, but it is surprisingly easy
+to generate such code and make sure this is a valid Python code that you can import from your DAGs.
+
+For example assume you dynamically generate (in your DAG folder), the ``my_company_utils/common.py`` file:
+
+.. code-block:: python
+
+    # This file is generated automatically !
+    ALL_TASKS = ["task1", "task2", "task3"]
+
+Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like that:
+
+.. code-block:: python
+
+    from my_company_utils.common import ALL_TASKS
+
+    with DAG(dag_id="my_dag", schedule_interval=None, start_date=days_ago(2)) as dag:
+        for task in ALL_TASKS:
+            # create your operators and relations here
+            pass
+
+Don't forget that in this case you need to add empty ``__init__.py`` file in the ``my_company_utils`` folder
+and you should add the ``my_company_utils/.*`` line to ``.airflowignore`` file, so that the whole folder is
+ignored by the scheduler when it looks for DAGs.
+
+
+Dynamic DAGs with external configuration from a structured data file
+....................................................................
+
+If you need to use a more complex meta-data to prepare your DAG structure and you would prefer to keep the
+data in a structured non-python format, you should export the data to the DAG folder in a file and push
+it to the DAG folder, rather than try to pull the data by the DAG's top-level code - for the reasons
+explained in the parent `Top level Python code <_top-level-python-code>`_.
+
+The meta-data should be exported and stored together with the DAGs in a convenient file format (JSON, YAML
+formats are good candidates) in DAG folder. Ideally, the meta-data should be published in the same
+package/folder as the module of the DAG file you load it from, because then you can find location of
+the meta-data file in your DAG easily. The location of the file to read can be found using the
+``__file__`` attribute of the module containing the DAG:
+
+.. code-block:: python
+
+    my_dir = os.path.dirname(os.path.abspath(__file__))
+    configuration_file_path = os.path.join(my_dir, "config.yaml")
+    with open(configuration_file_path) as yaml_file:
+        configuration = yaml.safe_load(yaml_file)
+    # Configuration dict is available here
+
+
+.. _best_practices/airflow_variables:
+
+Airflow Variables
+-----------------
+
+As mentioned in the previous chapter, `Top level Python code <_top-level-python-code>`_. you should avoid
+using Airflow Variables at top level Python code of DAGs. You can use the Airflow Variables freely inside the
+``execute()`` methods of the operators, but you can also pass the Airflow Variables to the existing operators
+via Jinja template, which will delay reading the value until the task execution.
 
-The best way of using variables is via a Jinja template, which will delay reading the value until the task execution.
 The template syntax to do this is:
 
 .. code-block::
@@ -117,17 +218,28 @@ or if you need to deserialize a json object from the variable :
 For security purpose, you're recommended to use the :ref:`Secrets Backend<secrets_backend_configuration>`
 for any variable that contains sensitive data.
 
-An alternative option is to use environment variables in the top-level Python code or use environment variables to
-create and manage Airflow variables. This will avoid new connections to Airflow metadata DB every time
-Airflow parses the Python file. For more information, see: :ref:`managing_variables`.
+Triggering DAGs after changes
+-----------------------------
+
+Avoid triggering DAGs immediately after changing them or any other accompanying files that you change in the
+DAG folder.
 
-Top level Python Code
----------------------
+You should give the system sufficient time to process the changed files. This takes several steps.
+First the files have to be distributed to scheduler - usually via distributed filesystem or Git-Sync, then
+scheduler has to parse the Python files and store them in the database. Depending on your configuration,
+speed of your distributed filesystem, number of files, number of DAGs, number of changes in the files,
+sizes of the files, number of schedulers, speed of CPUS, this can take from seconds to minutes, in extreme
+cases many minutes. You should wait for your DAG to appear in the UI to be able to trigger it.
 
-In general, you should not write any code outside of defining Airflow constructs like Operators. The code outside the
-tasks runs every time Airflow parses an eligible python file, which happens at the minimum frequency of
-:ref:`min_file_process_interval<config:scheduler__min_file_process_interval>` seconds.
+In case you see long delays between updating it and the time it is ready to be triggered, you can look
+at the following configuration parameters and fine tune them according your needs (see details of
+each parameter by following the links):
 
+* :ref:`config:scheduler__processor_poll_interval`
+* :ref:`config:scheduler__min_file_process_interval`
+* :ref:`config:scheduler__dag_dir_list_interval`
+* :ref:`config:scheduler__parsing_processes`
+* :ref:`config:scheduler__file_parsing_sort_mode`
 
 Testing a DAG
 ^^^^^^^^^^^^^
diff --git a/docs/conf.py b/docs/conf.py
index 975ed16..0197e42 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -352,6 +352,12 @@ if PACKAGE_NAME == 'apache-airflow':
             for key in keys_to_format:
                 if option[key] and "{{" in option[key]:
                     option[key] = option[key].replace("{{", "{").replace("}}", "}")
+    # Sort options, config and deprecated options for JINJA variables to display
+    for config in configs:
+        config["options"] = sorted(config["options"], key=lambda o: o["name"])
+    configs = sorted(configs, key=lambda l: l["name"])
+    for section in deprecated_options:
+        deprecated_options[section] = {k: v for k, v in sorted(deprecated_options[section].items())}
 
     jinja_contexts = {
         'config_ctx': {"configs": configs, "deprecated_options": deprecated_options},