You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/10/20 22:33:09 UTC
[airflow] branch main updated: Add decorators for external and venv python branching operators (#35043)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c2a733aa95 Add decorators for external and venv python branching operators (#35043)
c2a733aa95 is described below
commit c2a733aa95d45c8b6af4e5b05621aca28515946e
Author: Jens Scheffler <95...@users.noreply.github.com>
AuthorDate: Sat Oct 21 00:33:01 2023 +0200
Add decorators for external and venv python branching operators (#35043)
---
airflow/decorators/__init__.py | 6 ++
airflow/decorators/__init__.pyi | 97 ++++++++++++++++++
airflow/decorators/branch_external_python.py | 56 ++++++++++
airflow/decorators/branch_virtualenv.py | 56 ++++++++++
airflow/example_dags/example_branch_operator.py | 114 +++++++++++++++++++--
.../example_branch_operator_decorator.py | 104 +++++++++++++++++--
airflow/operators/python.py | 8 ++
docs/apache-airflow/core-concepts/dags.rst | 2 +
docs/apache-airflow/howto/operator/python.rst | 106 ++++++++++++++++++-
tests/decorators/test_branch_external_python.py | 90 ++++++++++++++++
tests/decorators/test_branch_virtualenv.py | 94 +++++++++++++++++
tests/jobs/test_backfill_job.py | 20 +++-
12 files changed, 733 insertions(+), 20 deletions(-)
diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py
index 2b2fccf8fa..31bcfb263c 100644
--- a/airflow/decorators/__init__.py
+++ b/airflow/decorators/__init__.py
@@ -19,7 +19,9 @@ from __future__ import annotations
from typing import Any, Callable
from airflow.decorators.base import TaskDecorator
+from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
+from airflow.decorators.branch_virtualenv import branch_virtualenv_task
from airflow.decorators.external_python import external_python_task
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
@@ -41,6 +43,8 @@ __all__ = [
"virtualenv_task",
"external_python_task",
"branch_task",
+ "branch_virtualenv_task",
+ "branch_external_python_task",
"short_circuit_task",
"sensor_task",
"setup",
@@ -55,6 +59,8 @@ class TaskDecoratorCollection:
virtualenv = staticmethod(virtualenv_task)
external_python = staticmethod(external_python_task)
branch = staticmethod(branch_task)
+ branch_virtualenv = staticmethod(branch_virtualenv_task)
+ branch_external_python = staticmethod(branch_external_python_task)
short_circuit = staticmethod(short_circuit_task)
sensor = staticmethod(sensor_task)
diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index e41ae930e1..0c3e94bf5c 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -26,7 +26,9 @@ from typing import Any, Callable, Collection, Container, Iterable, Mapping, over
from kubernetes.client import models as k8s
from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator
+from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
+from airflow.decorators.branch_virtualenv import branch_virtualenv_task
from airflow.decorators.external_python import external_python_task
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
@@ -47,6 +49,8 @@ __all__ = [
"virtualenv_task",
"external_python_task",
"branch_task",
+ "branch_virtualenv_task",
+ "branch_external_python_task",
"short_circuit_task",
"sensor_task",
"setup",
@@ -194,6 +198,99 @@ class TaskDecoratorCollection:
@overload
def branch(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@overload
+ def branch_virtualenv(
+ self,
+ *,
+ multiple_outputs: bool | None = None,
+ # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
+ # _PythonVirtualenvDecoratedOperator.
+ requirements: None | Iterable[str] | str = None,
+ python_version: None | str | int | float = None,
+ use_dill: bool = False,
+ system_site_packages: bool = True,
+ templates_dict: Mapping[str, Any] | None = None,
+ pip_install_options: list[str] | None = None,
+ skip_on_exit_code: int | Container[int] | None = None,
+ index_urls: None | Collection[str] | str = None,
+ venv_cache_path: None | str = None,
+ show_return_value_in_logs: bool = True,
+ **kwargs,
+ ) -> TaskDecorator:
+ """Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
+
+ For more information on how to use this decorator, see :ref:`concepts:branching`.
+ Accepts arbitrary for operator kwarg. Can be reused in a single DAG.
+
+ :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
+ Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
+ :param requirements: Either a list of requirement strings, or a (templated)
+ "requirements file" as specified by pip.
+ :param python_version: The Python version to run the virtual environment with. Note that
+ both 2 and 2.7 are acceptable forms.
+ :param use_dill: Whether to use dill to serialize
+ the args and result (pickle is default). This allow more complex types
+ but requires you to include dill in your requirements.
+ :param system_site_packages: Whether to include
+ system_site_packages in your virtual environment.
+ See virtualenv documentation for more information.
+ :param pip_install_options: a list of pip install options when installing requirements
+ See 'pip install -h' for available options
+ :param skip_on_exit_code: If python_callable exits with this exit code, leave the task
+ in ``skipped`` state (default: None). If set to ``None``, any non-zero
+ exit code will be treated as a failure.
+ :param index_urls: an optional list of index urls to load Python packages from.
+ If not provided the system pip conf will be used to source packages from.
+ :param venv_cache_path: Optional path to the virtual environment parent folder in which the
+ virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced
+ with a checksum of requirements. If not provided the virtual environment will be created and
+ deleted in a temp folder for every execution.
+ :param show_return_value_in_logs: a bool value whether to show return_value
+ logs. Defaults to True, which allows return value log output.
+ It can be set to False to prevent log output of return value when you return huge data
+ such as transmission a large amount of XCom to TaskAPI.
+ """
+ @overload
+ def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
+ @overload
+ def branch_external_python(
+ self,
+ *,
+ python: str,
+ multiple_outputs: bool | None = None,
+ # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
+ # _PythonVirtualenvDecoratedOperator.
+ use_dill: bool = False,
+ templates_dict: Mapping[str, Any] | None = None,
+ show_return_value_in_logs: bool = True,
+ **kwargs,
+ ) -> TaskDecorator:
+ """Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator.
+
+ For more information on how to use this decorator, see :ref:`concepts:branching`.
+ Accepts arbitrary for operator kwarg. Can be reused in a single DAG.
+
+ :param python: Full path string (file-system specific) that points to a Python binary inside
+ a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
+ (so usually start with "/" or "X:/" depending on the filesystem/os used).
+ :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
+ Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
+ :param use_dill: Whether to use dill to serialize
+ the args and result (pickle is default). This allow more complex types
+ but requires you to include dill in your requirements.
+ :param templates_dict: a dictionary where the values are templates that
+ will get templated by the Airflow engine sometime between
+ ``__init__`` and ``execute`` takes place and are made available
+ in your callable's context after the template has been applied.
+ :param show_return_value_in_logs: a bool value whether to show return_value
+ logs. Defaults to True, which allows return value log output.
+ It can be set to False to prevent log output of return value when you return huge data
+ such as transmission a large amount of XCom to TaskAPI.
+ """
+ @overload
+ def branch_external_python(
+ self, python_callable: Callable[FParams, FReturn]
+ ) -> Task[FParams, FReturn]: ...
+ @overload
def short_circuit(
self,
*,
diff --git a/airflow/decorators/branch_external_python.py b/airflow/decorators/branch_external_python.py
new file mode 100644
index 0000000000..8e945541c5
--- /dev/null
+++ b/airflow/decorators/branch_external_python.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Callable
+
+from airflow.decorators.base import task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
+from airflow.operators.python import BranchExternalPythonOperator
+
+if TYPE_CHECKING:
+ from airflow.decorators.base import TaskDecorator
+
+
+class _BranchExternalPythonDecoratedOperator(_PythonDecoratedOperator, BranchExternalPythonOperator):
+ """Wraps a Python callable and captures args/kwargs when called for execution."""
+
+ custom_operator_name: str = "@task.branch_external_python"
+
+
+def branch_external_python_task(
+ python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs
+) -> TaskDecorator:
+ """
+ Wrap a python function into a BranchExternalPythonOperator.
+
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`concepts:branching`
+
+ Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+ :param python_callable: Function to decorate
+ :param multiple_outputs: if set, function return value will be
+ unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys.
+ Defaults to False.
+ """
+ return task_decorator_factory(
+ python_callable=python_callable,
+ multiple_outputs=multiple_outputs,
+ decorated_operator_class=_BranchExternalPythonDecoratedOperator,
+ **kwargs,
+ )
diff --git a/airflow/decorators/branch_virtualenv.py b/airflow/decorators/branch_virtualenv.py
new file mode 100644
index 0000000000..3e4c3fcaf1
--- /dev/null
+++ b/airflow/decorators/branch_virtualenv.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Callable
+
+from airflow.decorators.base import task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
+from airflow.operators.python import BranchPythonVirtualenvOperator
+
+if TYPE_CHECKING:
+ from airflow.decorators.base import TaskDecorator
+
+
+class _BranchPythonVirtualenvDecoratedOperator(_PythonDecoratedOperator, BranchPythonVirtualenvOperator):
+ """Wraps a Python callable and captures args/kwargs when called for execution."""
+
+ custom_operator_name: str = "@task.branch_virtualenv"
+
+
+def branch_virtualenv_task(
+ python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs
+) -> TaskDecorator:
+ """
+ Wrap a python function into a BranchPythonVirtualenvOperator.
+
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`concepts:branching`
+
+ Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+ :param python_callable: Function to decorate
+ :param multiple_outputs: if set, function return value will be
+ unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys.
+ Defaults to False.
+ """
+ return task_decorator_factory(
+ python_callable=python_callable,
+ multiple_outputs=multiple_outputs,
+ decorated_operator_class=_BranchPythonVirtualenvDecoratedOperator,
+ **kwargs,
+ )
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index 92fdf3b250..594c6a4cb1 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -15,36 +15,56 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Example DAG demonstrating the usage of the BranchPythonOperator."""
+"""Example DAG demonstrating the usage of the Classic branching Python operators.
+
+It is showcasing the basic BranchPythonOperator and its sisters BranchExternalPythonOperator
+and BranchPythonVirtualenvOperator."""
from __future__ import annotations
import random
+import sys
+import tempfile
+from pathlib import Path
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import BranchPythonOperator
+from airflow.operators.python import (
+ BranchExternalPythonOperator,
+ BranchPythonOperator,
+ BranchPythonVirtualenvOperator,
+ ExternalPythonOperator,
+ PythonOperator,
+ PythonVirtualenvOperator,
+)
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
+PATH_TO_PYTHON_BINARY = sys.executable
+
with DAG(
dag_id="example_branch_operator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
tags=["example", "example2"],
+ orientation="TB",
) as dag:
run_this_first = EmptyOperator(
task_id="run_this_first",
)
- options = ["branch_a", "branch_b", "branch_c", "branch_d"]
+ options = ["a", "b", "c", "d"]
+
+ # Example branching on standard Python tasks
+ # [START howto_operator_branch_python]
branching = BranchPythonOperator(
task_id="branching",
- python_callable=lambda: random.choice(options),
+ python_callable=lambda: f"branch_{random.choice(options)}",
)
+ # [END howto_operator_branch_python]
run_this_first >> branching
join = EmptyOperator(
@@ -53,8 +73,9 @@ with DAG(
)
for option in options:
- t = EmptyOperator(
- task_id=option,
+ t = PythonOperator(
+ task_id=f"branch_{option}",
+ python_callable=lambda: print("Hello World"),
)
empty_follow = EmptyOperator(
@@ -63,3 +84,84 @@ with DAG(
# Label is optional here, but it can help identify more complex branches
branching >> Label(option) >> t >> empty_follow >> join
+
+ # Example the same with external Python calls
+
+ # [START howto_operator_branch_ext_py]
+ def branch_with_external_python(choices):
+ import random
+
+ return f"ext_py_{random.choice(choices)}"
+
+ branching_ext_py = BranchExternalPythonOperator(
+ task_id="branching_ext_python",
+ python=PATH_TO_PYTHON_BINARY,
+ python_callable=branch_with_external_python,
+ op_args=[options],
+ )
+ # [END howto_operator_branch_ext_py]
+ join >> branching_ext_py
+
+ join_ext_py = EmptyOperator(
+ task_id="join_ext_python",
+ trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+ )
+
+ def hello_world_with_external_python():
+ print("Hello World from external Python")
+
+ for option in options:
+ t = ExternalPythonOperator(
+ task_id=f"ext_py_{option}",
+ python=PATH_TO_PYTHON_BINARY,
+ python_callable=hello_world_with_external_python,
+ )
+
+ # Label is optional here, but it can help identify more complex branches
+ branching_ext_py >> Label(option) >> t >> join_ext_py
+
+ # Example the same with Python virtual environments
+
+ # [START howto_operator_branch_virtualenv]
+ # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
+ # Run the example a second time and see that it re-uses it and is faster.
+ VENV_CACHE_PATH = Path(tempfile.gettempdir())
+
+ def branch_with_venv(choices):
+ import random
+
+ import numpy as np
+
+ print(f"Some numpy stuff: {np.arange(6)}")
+ return f"venv_{random.choice(choices)}"
+
+ branching_venv = BranchPythonVirtualenvOperator(
+ task_id="branching_venv",
+ requirements=["numpy~=1.24.4"],
+ venv_cache_path=VENV_CACHE_PATH,
+ python_callable=branch_with_venv,
+ op_args=[options],
+ )
+ # [END howto_operator_branch_virtualenv]
+ join_ext_py >> branching_venv
+
+ join_venv = EmptyOperator(
+ task_id="join_venv",
+ trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+ )
+
+ def hello_world_with_venv():
+ import numpy as np
+
+ print(f"Hello World with some numpy stuff: {np.arange(6)}")
+
+ for option in options:
+ t = PythonVirtualenvOperator(
+ task_id=f"venv_{option}",
+ requirements=["numpy~=1.24.4"],
+ venv_cache_path=VENV_CACHE_PATH,
+ python_callable=hello_world_with_venv,
+ )
+
+ # Label is optional here, but it can help identify more complex branches
+ branching_venv >> Label(option) >> t >> join_venv
diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py
index b250c12073..5d42ff6b27 100644
--- a/airflow/example_dags/example_branch_operator_decorator.py
+++ b/airflow/example_dags/example_branch_operator_decorator.py
@@ -15,10 +15,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Example DAG demonstrating the usage of the ``@task.branch`` TaskFlow API decorator."""
+"""Example DAG demonstrating the usage of the branching TaskFlow API decorators.
+
+It shows how to use standard Python ``@task.branch`` as well as the external Python
+version ``@task.branch_external_python`` which calls an external Python interpreter and
+the ``@task.branch_virtualenv`` which builds a temporary Python virtual environment.
+"""
from __future__ import annotations
import random
+import sys
+import tempfile
import pendulum
@@ -28,31 +35,110 @@ from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
+PATH_TO_PYTHON_BINARY = sys.executable
+
with DAG(
dag_id="example_branch_python_operator_decorator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
tags=["example", "example2"],
+ orientation="TB",
) as dag:
run_this_first = EmptyOperator(task_id="run_this_first")
- options = ["branch_a", "branch_b", "branch_c", "branch_d"]
+ options = ["a", "b", "c", "d"]
+
+ # Example branching on standard Python tasks
- @task.branch(task_id="branching")
- def random_choice(choices: list[str]) -> str:
- return random.choice(choices)
+ # [START howto_operator_branch_python]
+ @task.branch()
+ def branching(choices: list[str]) -> str:
+ return f"branch_{random.choice(choices)}"
- random_choice_instance = random_choice(choices=options)
+ # [END howto_operator_branch_python]
+
+ random_choice_instance = branching(choices=options)
run_this_first >> random_choice_instance
join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
for option in options:
- t = EmptyOperator(task_id=option)
- empty_follow = EmptyOperator(task_id="follow_" + option)
+ @task(task_id=f"branch_{option}")
+ def some_task():
+ print("doing something in Python")
+
+ t = some_task()
+ empty = EmptyOperator(task_id=f"follow_{option}")
+
+ # Label is optional here, but it can help identify more complex branches
+ random_choice_instance >> Label(option) >> t >> empty >> join
+
+ # Example the same with external Python calls
+
+ # [START howto_operator_branch_ext_py]
+ @task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
+ def branching_ext_python(choices) -> str:
+ import random
+
+ return f"ext_py_{random.choice(choices)}"
+
+ # [END howto_operator_branch_ext_py]
+
+ random_choice_ext_py = branching_ext_python(choices=options)
+
+ join >> random_choice_ext_py
+
+ join_ext_py = EmptyOperator(task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+
+ for option in options:
+
+ @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY)
+ def some_ext_py_task():
+ print("doing something in external Python")
+
+ t = some_ext_py_task()
+
+ # Label is optional here, but it can help identify more complex branches
+ random_choice_ext_py >> Label(option) >> t >> join_ext_py
+
+ # Example the same with Python virtual environments
+
+ # [START howto_operator_branch_virtualenv]
+ # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
+ # Run the example a second time and see that it re-uses it and is faster.
+ VENV_CACHE_PATH = tempfile.gettempdir()
+
+ @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH)
+ def branching_virtualenv(choices) -> str:
+ import random
+
+ import numpy as np
+
+ print(f"Some numpy stuff: {np.arange(6)}")
+ return f"venv_{random.choice(choices)}"
+
+ # [END howto_operator_branch_virtualenv]
+
+ random_choice_venv = branching_virtualenv(choices=options)
+
+ join_ext_py >> random_choice_venv
+
+ join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+
+ for option in options:
+
+ @task.virtualenv(
+ task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH
+ )
+ def some_venv_task():
+ import numpy as np
+
+ print(f"Some numpy stuff: {np.arange(6)}")
+
+ t = some_venv_task()
# Label is optional here, but it can help identify more complex branches
- random_choice_instance >> Label(option) >> t >> empty_follow >> join
+ random_choice_venv >> Label(option) >> t >> join_venv
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 118d1d4bdb..0eadb5441a 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -729,6 +729,10 @@ class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn):
these paths can't move forward. The ``skipped`` states are propagated
downstream to allow for the DAG state to fill up and the DAG run's state
to be inferred.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:BranchPythonVirtualenvOperator`
"""
def execute(self, context: Context) -> Any:
@@ -910,6 +914,10 @@ class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn):
Extends ExternalPythonOperator, so expects to get Python:
virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path,
so it can run on separate virtual environment similarly to ExternalPythonOperator.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:BranchExternalPythonOperator`
"""
def execute(self, context: Context) -> Any:
diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst
index 8ca2689320..86f4aaad2e 100644
--- a/docs/apache-airflow/core-concepts/dags.rst
+++ b/docs/apache-airflow/core-concepts/dags.rst
@@ -371,6 +371,8 @@ As with the callable for ``@task.branch``, this method can return the ID of a do
else:
return None
+Similar like ``@task.branch`` decorator for regular Python code there are also branch decorators which use a virtual environment called ``@task.branch_virtualenv`` or external python called ``@task.branch_external_python``.
+
.. _concepts:latest-only:
diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst
index 6adbbe8b9e..6cfb5a335d 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -103,7 +103,15 @@ Otherwise you won't have access to the most context variables of Airflow in ``op
If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and
``lazy_object_proxy``.
-If additional parameters for package installation are needed pass them in ``requirements.txt`` as in the example below:
+.. warning::
+ The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code.
+ As in the examples you need to add all imports again and you can not rely on variables from the global Python context.
+
+ If you want to pass variables into the classic :class:`~airflow.operators.python.PythonVirtualenvOperator` use
+ ``op_args`` and ``op_kwargs``.
+
+If additional parameters for package installation are needed pass them in via the ``pip_install_options`` parameter or use a
+``requirements.txt`` as in the example below:
.. code-block::
@@ -196,6 +204,102 @@ Otherwise you won't have access to the most context variables of Airflow in ``op
If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and
``lazy_object_proxy`` to your virtual environment.
+.. warning::
+ The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code.
+ As in the examples you need to add all imports again and you can not rely on variables from the global Python context.
+
+ If you want to pass variables into the classic :class:`~airflow.operators.python.ExternalPythonOperator` use
+ ``op_args`` and ``op_kwargs``.
+
+.. _howto/operator:PythonBranchOperator:
+
+PythonBranchOperator
+====================
+
+Use the ``@task.branch`` decorator to execute Python :ref:`branching <concepts:branching>` tasks.
+
+.. warning::
+ The ``@task.branch`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonBranchOperator`
+ to execute Python code.
+
+TaskFlow example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_branch_python]
+ :end-before: [END howto_operator_branch_python]
+
+Classic example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_branch_python]
+ :end-before: [END howto_operator_branch_python]
+
+Argument passing and templating options are the same like with :ref:`howto/operator:PythonOperator`.
+
+.. _howto/operator:BranchPythonVirtualenvOperator:
+
+BranchPythonVirtualenvOperator
+==============================
+
+Use the ``@task.branch_virtualenv`` decorator to execute Python :ref:`branching <concepts:branching>` tasks and is a hybrid of
+the branch decorator with execution in a virtual environment.
+
+.. warning::
+ The ``@task.branch_virtualenv`` decorator is recommended over the classic
+ :class:`~airflow.operators.python.BranchPythonVirtualenvOperator` to execute Python code.
+
+TaskFlow example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_branch_virtualenv]
+ :end-before: [END howto_operator_branch_virtualenv]
+
+Classic example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_branch_virtualenv]
+ :end-before: [END howto_operator_branch_virtualenv]
+
+Argument passing and templating options are the same like with :ref:`howto/operator:PythonVirtualenvOperator`.
+
+.. _howto/operator:BranchExternalPythonOperator:
+
+BranchExternalPythonOperator
+============================
+
+Use the ``@task.branch_external_python`` decorator to execute Python :ref:`branching <concepts:branching>` tasks and is a hybrid of
+the branch decorator with execution in an external Python environment.
+
+.. warning::
+ The ``@task.branch_external_python`` decorator is recommended over the classic
+ :class:`~airflow.operators.python.BranchExternalPythonOperator` to execute Python code.
+
+TaskFlow example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_branch_ext_py]
+ :end-before: [END howto_operator_branch_ext_py]
+
+Classic example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_branch_ext_py]
+ :end-before: [END howto_operator_branch_ext_py]
+
+Argument passing and templating options are the same like with :ref:`howto/operator:ExternalPythonOperator`.
+
.. _howto/operator:ShortCircuitOperator:
ShortCircuitOperator
diff --git a/tests/decorators/test_branch_external_python.py b/tests/decorators/test_branch_external_python.py
new file mode 100644
index 0000000000..01e1f9b968
--- /dev/null
+++ b/tests/decorators/test_branch_external_python.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+
+import pytest
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+ @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"])
+ def test_branch_one(self, dag_maker, branch_task_name):
+ @task
+ def dummy_f():
+ pass
+
+ @task
+ def task_1():
+ pass
+
+ @task
+ def task_2():
+ pass
+
+ if (
+ branch_task_name == "task_1"
+ ): # Note we manually need to carry the literal value into the venc code :-(
+
+ @task.branch_external_python(task_id="branching", python=sys.executable)
+ def branch_operator():
+ return "task_1"
+
+ else:
+
+ @task.branch_external_python(task_id="branching", python=sys.executable)
+ def branch_operator():
+ return "task_2"
+
+ with dag_maker():
+ branchoperator = branch_operator()
+ df = dummy_f()
+ task_1 = task_1()
+ task_2 = task_2()
+
+ df.set_downstream(branchoperator)
+ branchoperator.set_downstream(task_1)
+ branchoperator.set_downstream(task_2)
+
+ dr = dag_maker.create_dagrun()
+ df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ branchoperator.operator.run(
+ start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True
+ )
+ task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ tis = dr.get_task_instances()
+
+ for ti in tis:
+ if ti.task_id == "dummy_f":
+ assert ti.state == State.SUCCESS
+ if ti.task_id == "branching":
+ assert ti.state == State.SUCCESS
+
+ if ti.task_id == "task_1" and branch_task_name == "task_1":
+ assert ti.state == State.SUCCESS
+ elif ti.task_id == "task_1":
+ assert ti.state == State.SKIPPED
+
+ if ti.task_id == "task_2" and branch_task_name == "task_2":
+ assert ti.state == State.SUCCESS
+ elif ti.task_id == "task_2":
+ assert ti.state == State.SKIPPED
diff --git a/tests/decorators/test_branch_virtualenv.py b/tests/decorators/test_branch_virtualenv.py
new file mode 100644
index 0000000000..861ba154af
--- /dev/null
+++ b/tests/decorators/test_branch_virtualenv.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+ @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"])
+ def test_branch_one(self, dag_maker, branch_task_name):
+ @task
+ def dummy_f():
+ pass
+
+ @task
+ def task_1():
+ pass
+
+ @task
+ def task_2():
+ pass
+
+ if (
+ branch_task_name == "task_1"
+ ): # Note we manually need to carry the literal value into the venc code :-(
+
+ @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"])
+ def branch_operator():
+ import funcsigs
+
+ print(f"We successfully imported funcsigs version {funcsigs.__version__}")
+ return "task_1"
+
+ else:
+
+ @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"])
+ def branch_operator():
+ import funcsigs
+
+ print(f"We successfully imported funcsigs version {funcsigs.__version__}")
+ return "task_2"
+
+ with dag_maker():
+ branchoperator = branch_operator()
+ df = dummy_f()
+ task_1 = task_1()
+ task_2 = task_2()
+
+ df.set_downstream(branchoperator)
+ branchoperator.set_downstream(task_1)
+ branchoperator.set_downstream(task_2)
+
+ dr = dag_maker.create_dagrun()
+ df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ branchoperator.operator.run(
+ start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True
+ )
+ task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+ tis = dr.get_task_instances()
+
+ for ti in tis:
+ if ti.task_id == "dummy_f":
+ assert ti.state == State.SUCCESS
+ if ti.task_id == "branching":
+ assert ti.state == State.SUCCESS
+
+ if ti.task_id == "task_1" and branch_task_name == "task_1":
+ assert ti.state == State.SUCCESS
+ elif ti.task_id == "task_1":
+ assert ti.state == State.SKIPPED
+
+ if ti.task_id == "task_2" and branch_task_name == "task_2":
+ assert ti.state == State.SUCCESS
+ elif ti.task_id == "task_2":
+ assert ti.state == State.SKIPPED
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 8d592dfe65..195a959ee9 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -244,11 +244,23 @@ class TestBackfillJob:
"branch_b",
"branch_c",
"branch_d",
- "follow_branch_a",
- "follow_branch_b",
- "follow_branch_c",
- "follow_branch_d",
+ "follow_a",
+ "follow_b",
+ "follow_c",
+ "follow_d",
"join",
+ "branching_ext_python",
+ "ext_py_a",
+ "ext_py_b",
+ "ext_py_c",
+ "ext_py_d",
+ "join_ext_python",
+ "branching_venv",
+ "venv_a",
+ "venv_b",
+ "venv_c",
+ "venv_d",
+ "join_venv",
),
],
[