You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/10/20 22:33:09 UTC

[airflow] branch main updated: Add decorators for external and venv python branching operators (#35043)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c2a733aa95 Add decorators for external and venv python branching operators (#35043)
c2a733aa95 is described below

commit c2a733aa95d45c8b6af4e5b05621aca28515946e
Author: Jens Scheffler <95...@users.noreply.github.com>
AuthorDate: Sat Oct 21 00:33:01 2023 +0200

    Add decorators for external and venv python branching operators (#35043)
---
 airflow/decorators/__init__.py                     |   6 ++
 airflow/decorators/__init__.pyi                    |  97 ++++++++++++++++++
 airflow/decorators/branch_external_python.py       |  56 ++++++++++
 airflow/decorators/branch_virtualenv.py            |  56 ++++++++++
 airflow/example_dags/example_branch_operator.py    | 114 +++++++++++++++++++--
 .../example_branch_operator_decorator.py           | 104 +++++++++++++++++--
 airflow/operators/python.py                        |   8 ++
 docs/apache-airflow/core-concepts/dags.rst         |   2 +
 docs/apache-airflow/howto/operator/python.rst      | 106 ++++++++++++++++++-
 tests/decorators/test_branch_external_python.py    |  90 ++++++++++++++++
 tests/decorators/test_branch_virtualenv.py         |  94 +++++++++++++++++
 tests/jobs/test_backfill_job.py                    |  20 +++-
 12 files changed, 733 insertions(+), 20 deletions(-)

diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py
index 2b2fccf8fa..31bcfb263c 100644
--- a/airflow/decorators/__init__.py
+++ b/airflow/decorators/__init__.py
@@ -19,7 +19,9 @@ from __future__ import annotations
 from typing import Any, Callable
 
 from airflow.decorators.base import TaskDecorator
+from airflow.decorators.branch_external_python import branch_external_python_task
 from airflow.decorators.branch_python import branch_task
+from airflow.decorators.branch_virtualenv import branch_virtualenv_task
 from airflow.decorators.external_python import external_python_task
 from airflow.decorators.python import python_task
 from airflow.decorators.python_virtualenv import virtualenv_task
@@ -41,6 +43,8 @@ __all__ = [
     "virtualenv_task",
     "external_python_task",
     "branch_task",
+    "branch_virtualenv_task",
+    "branch_external_python_task",
     "short_circuit_task",
     "sensor_task",
     "setup",
@@ -55,6 +59,8 @@ class TaskDecoratorCollection:
     virtualenv = staticmethod(virtualenv_task)
     external_python = staticmethod(external_python_task)
     branch = staticmethod(branch_task)
+    branch_virtualenv = staticmethod(branch_virtualenv_task)
+    branch_external_python = staticmethod(branch_external_python_task)
     short_circuit = staticmethod(short_circuit_task)
     sensor = staticmethod(sensor_task)
 
diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index e41ae930e1..0c3e94bf5c 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -26,7 +26,9 @@ from typing import Any, Callable, Collection, Container, Iterable, Mapping, over
 from kubernetes.client import models as k8s
 
 from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator
+from airflow.decorators.branch_external_python import branch_external_python_task
 from airflow.decorators.branch_python import branch_task
+from airflow.decorators.branch_virtualenv import branch_virtualenv_task
 from airflow.decorators.external_python import external_python_task
 from airflow.decorators.python import python_task
 from airflow.decorators.python_virtualenv import virtualenv_task
@@ -47,6 +49,8 @@ __all__ = [
     "virtualenv_task",
     "external_python_task",
     "branch_task",
+    "branch_virtualenv_task",
+    "branch_external_python_task",
     "short_circuit_task",
     "sensor_task",
     "setup",
@@ -194,6 +198,99 @@ class TaskDecoratorCollection:
     @overload
     def branch(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
     @overload
+    def branch_virtualenv(
+        self,
+        *,
+        multiple_outputs: bool | None = None,
+        # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
+        # _PythonVirtualenvDecoratedOperator.
+        requirements: None | Iterable[str] | str = None,
+        python_version: None | str | int | float = None,
+        use_dill: bool = False,
+        system_site_packages: bool = True,
+        templates_dict: Mapping[str, Any] | None = None,
+        pip_install_options: list[str] | None = None,
+        skip_on_exit_code: int | Container[int] | None = None,
+        index_urls: None | Collection[str] | str = None,
+        venv_cache_path: None | str = None,
+        show_return_value_in_logs: bool = True,
+        **kwargs,
+    ) -> TaskDecorator:
+        """Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
+
+        For more information on how to use this decorator, see :ref:`concepts:branching`.
+        Accepts arbitrary for operator kwarg. Can be reused in a single DAG.
+
+        :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
+            Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
+        :param requirements: Either a list of requirement strings, or a (templated)
+            "requirements file" as specified by pip.
+        :param python_version: The Python version to run the virtual environment with. Note that
+            both 2 and 2.7 are acceptable forms.
+        :param use_dill: Whether to use dill to serialize
+            the args and result (pickle is default). This allow more complex types
+            but requires you to include dill in your requirements.
+        :param system_site_packages: Whether to include
+            system_site_packages in your virtual environment.
+            See virtualenv documentation for more information.
+        :param pip_install_options: a list of pip install options when installing requirements
+            See 'pip install -h' for available options
+        :param skip_on_exit_code: If python_callable exits with this exit code, leave the task
+            in ``skipped`` state (default: None). If set to ``None``, any non-zero
+            exit code will be treated as a failure.
+        :param index_urls: an optional list of index urls to load Python packages from.
+            If not provided the system pip conf will be used to source packages from.
+        :param venv_cache_path: Optional path to the virtual environment parent folder in which the
+            virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced
+            with a checksum of requirements. If not provided the virtual environment will be created and
+            deleted in a temp folder for every execution.
+        :param show_return_value_in_logs: a bool value whether to show return_value
+            logs. Defaults to True, which allows return value log output.
+            It can be set to False to prevent log output of return value when you return huge data
+            such as transmission a large amount of XCom to TaskAPI.
+        """
+    @overload
+    def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
+    @overload
+    def branch_external_python(
+        self,
+        *,
+        python: str,
+        multiple_outputs: bool | None = None,
+        # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
+        # _PythonVirtualenvDecoratedOperator.
+        use_dill: bool = False,
+        templates_dict: Mapping[str, Any] | None = None,
+        show_return_value_in_logs: bool = True,
+        **kwargs,
+    ) -> TaskDecorator:
+        """Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator.
+
+        For more information on how to use this decorator, see :ref:`concepts:branching`.
+        Accepts arbitrary for operator kwarg. Can be reused in a single DAG.
+
+        :param python: Full path string (file-system specific) that points to a Python binary inside
+            a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
+            (so usually start with "/" or "X:/" depending on the filesystem/os used).
+        :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
+            Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
+        :param use_dill: Whether to use dill to serialize
+            the args and result (pickle is default). This allow more complex types
+            but requires you to include dill in your requirements.
+        :param templates_dict: a dictionary where the values are templates that
+            will get templated by the Airflow engine sometime between
+            ``__init__`` and ``execute`` takes place and are made available
+            in your callable's context after the template has been applied.
+        :param show_return_value_in_logs: a bool value whether to show return_value
+            logs. Defaults to True, which allows return value log output.
+            It can be set to False to prevent log output of return value when you return huge data
+            such as transmission a large amount of XCom to TaskAPI.
+        """
+    @overload
+    def branch_external_python(
+        self, python_callable: Callable[FParams, FReturn]
+    ) -> Task[FParams, FReturn]: ...
+    @overload
     def short_circuit(
         self,
         *,
diff --git a/airflow/decorators/branch_external_python.py b/airflow/decorators/branch_external_python.py
new file mode 100644
index 0000000000..8e945541c5
--- /dev/null
+++ b/airflow/decorators/branch_external_python.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Callable
+
+from airflow.decorators.base import task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
+from airflow.operators.python import BranchExternalPythonOperator
+
+if TYPE_CHECKING:
+    from airflow.decorators.base import TaskDecorator
+
+
+class _BranchExternalPythonDecoratedOperator(_PythonDecoratedOperator, BranchExternalPythonOperator):
+    """Wraps a Python callable and captures args/kwargs when called for execution."""
+
+    custom_operator_name: str = "@task.branch_external_python"
+
+
+def branch_external_python_task(
+    python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs
+) -> TaskDecorator:
+    """
+    Wrap a python function into a BranchExternalPythonOperator.
+
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`concepts:branching`
+
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+    :param python_callable: Function to decorate
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys.
+        Defaults to False.
+    """
+    return task_decorator_factory(
+        python_callable=python_callable,
+        multiple_outputs=multiple_outputs,
+        decorated_operator_class=_BranchExternalPythonDecoratedOperator,
+        **kwargs,
+    )
diff --git a/airflow/decorators/branch_virtualenv.py b/airflow/decorators/branch_virtualenv.py
new file mode 100644
index 0000000000..3e4c3fcaf1
--- /dev/null
+++ b/airflow/decorators/branch_virtualenv.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Callable
+
+from airflow.decorators.base import task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
+from airflow.operators.python import BranchPythonVirtualenvOperator
+
+if TYPE_CHECKING:
+    from airflow.decorators.base import TaskDecorator
+
+
+class _BranchPythonVirtualenvDecoratedOperator(_PythonDecoratedOperator, BranchPythonVirtualenvOperator):
+    """Wraps a Python callable and captures args/kwargs when called for execution."""
+
+    custom_operator_name: str = "@task.branch_virtualenv"
+
+
+def branch_virtualenv_task(
+    python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs
+) -> TaskDecorator:
+    """
+    Wrap a python function into a BranchPythonVirtualenvOperator.
+
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`concepts:branching`
+
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+    :param python_callable: Function to decorate
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys.
+        Defaults to False.
+    """
+    return task_decorator_factory(
+        python_callable=python_callable,
+        multiple_outputs=multiple_outputs,
+        decorated_operator_class=_BranchPythonVirtualenvDecoratedOperator,
+        **kwargs,
+    )
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index 92fdf3b250..594c6a4cb1 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -15,36 +15,56 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Example DAG demonstrating the usage of the BranchPythonOperator."""
+"""Example DAG demonstrating the usage of the Classic branching Python operators.
+
+It is showcasing the basic BranchPythonOperator and its sisters BranchExternalPythonOperator
+and BranchPythonVirtualenvOperator."""
 from __future__ import annotations
 
 import random
+import sys
+import tempfile
+from pathlib import Path
 
 import pendulum
 
 from airflow.models.dag import DAG
 from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import BranchPythonOperator
+from airflow.operators.python import (
+    BranchExternalPythonOperator,
+    BranchPythonOperator,
+    BranchPythonVirtualenvOperator,
+    ExternalPythonOperator,
+    PythonOperator,
+    PythonVirtualenvOperator,
+)
 from airflow.utils.edgemodifier import Label
 from airflow.utils.trigger_rule import TriggerRule
 
+PATH_TO_PYTHON_BINARY = sys.executable
+
 with DAG(
     dag_id="example_branch_operator",
     start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule="@daily",
     tags=["example", "example2"],
+    orientation="TB",
 ) as dag:
     run_this_first = EmptyOperator(
         task_id="run_this_first",
     )
 
-    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
+    options = ["a", "b", "c", "d"]
+
+    # Example branching on standard Python tasks
 
+    # [START howto_operator_branch_python]
     branching = BranchPythonOperator(
         task_id="branching",
-        python_callable=lambda: random.choice(options),
+        python_callable=lambda: f"branch_{random.choice(options)}",
     )
+    # [END howto_operator_branch_python]
     run_this_first >> branching
 
     join = EmptyOperator(
@@ -53,8 +73,9 @@ with DAG(
     )
 
     for option in options:
-        t = EmptyOperator(
-            task_id=option,
+        t = PythonOperator(
+            task_id=f"branch_{option}",
+            python_callable=lambda: print("Hello World"),
         )
 
         empty_follow = EmptyOperator(
@@ -63,3 +84,84 @@ with DAG(
 
         # Label is optional here, but it can help identify more complex branches
         branching >> Label(option) >> t >> empty_follow >> join
+
+    # Example the same with external Python calls
+
+    # [START howto_operator_branch_ext_py]
+    def branch_with_external_python(choices):
+        import random
+
+        return f"ext_py_{random.choice(choices)}"
+
+    branching_ext_py = BranchExternalPythonOperator(
+        task_id="branching_ext_python",
+        python=PATH_TO_PYTHON_BINARY,
+        python_callable=branch_with_external_python,
+        op_args=[options],
+    )
+    # [END howto_operator_branch_ext_py]
+    join >> branching_ext_py
+
+    join_ext_py = EmptyOperator(
+        task_id="join_ext_python",
+        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+    )
+
+    def hello_world_with_external_python():
+        print("Hello World from external Python")
+
+    for option in options:
+        t = ExternalPythonOperator(
+            task_id=f"ext_py_{option}",
+            python=PATH_TO_PYTHON_BINARY,
+            python_callable=hello_world_with_external_python,
+        )
+
+        # Label is optional here, but it can help identify more complex branches
+        branching_ext_py >> Label(option) >> t >> join_ext_py
+
+    # Example the same with Python virtual environments
+
+    # [START howto_operator_branch_virtualenv]
+    # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
+    #       Run the example a second time and see that it re-uses it and is faster.
+    VENV_CACHE_PATH = Path(tempfile.gettempdir())
+
+    def branch_with_venv(choices):
+        import random
+
+        import numpy as np
+
+        print(f"Some numpy stuff: {np.arange(6)}")
+        return f"venv_{random.choice(choices)}"
+
+    branching_venv = BranchPythonVirtualenvOperator(
+        task_id="branching_venv",
+        requirements=["numpy~=1.24.4"],
+        venv_cache_path=VENV_CACHE_PATH,
+        python_callable=branch_with_venv,
+        op_args=[options],
+    )
+    # [END howto_operator_branch_virtualenv]
+    join_ext_py >> branching_venv
+
+    join_venv = EmptyOperator(
+        task_id="join_venv",
+        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+    )
+
+    def hello_world_with_venv():
+        import numpy as np
+
+        print(f"Hello World with some numpy stuff: {np.arange(6)}")
+
+    for option in options:
+        t = PythonVirtualenvOperator(
+            task_id=f"venv_{option}",
+            requirements=["numpy~=1.24.4"],
+            venv_cache_path=VENV_CACHE_PATH,
+            python_callable=hello_world_with_venv,
+        )
+
+        # Label is optional here, but it can help identify more complex branches
+        branching_venv >> Label(option) >> t >> join_venv
diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py
index b250c12073..5d42ff6b27 100644
--- a/airflow/example_dags/example_branch_operator_decorator.py
+++ b/airflow/example_dags/example_branch_operator_decorator.py
@@ -15,10 +15,17 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Example DAG demonstrating the usage of the ``@task.branch`` TaskFlow API decorator."""
+"""Example DAG demonstrating the usage of the branching TaskFlow API decorators.
+
+It shows how to use standard Python ``@task.branch`` as well as the external Python
+version ``@task.branch_external_python`` which calls an external Python interpreter and
+the ``@task.branch_virtualenv`` which builds a temporary Python virtual environment.
+"""
 from __future__ import annotations
 
 import random
+import sys
+import tempfile
 
 import pendulum
 
@@ -28,31 +35,110 @@ from airflow.operators.empty import EmptyOperator
 from airflow.utils.edgemodifier import Label
 from airflow.utils.trigger_rule import TriggerRule
 
+PATH_TO_PYTHON_BINARY = sys.executable
+
 with DAG(
     dag_id="example_branch_python_operator_decorator",
     start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule="@daily",
     tags=["example", "example2"],
+    orientation="TB",
 ) as dag:
     run_this_first = EmptyOperator(task_id="run_this_first")
 
-    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
+    options = ["a", "b", "c", "d"]
+
+    # Example branching on standard Python tasks
 
-    @task.branch(task_id="branching")
-    def random_choice(choices: list[str]) -> str:
-        return random.choice(choices)
+    # [START howto_operator_branch_python]
+    @task.branch()
+    def branching(choices: list[str]) -> str:
+        return f"branch_{random.choice(choices)}"
 
-    random_choice_instance = random_choice(choices=options)
+    # [END howto_operator_branch_python]
+
+    random_choice_instance = branching(choices=options)
 
     run_this_first >> random_choice_instance
 
     join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
 
     for option in options:
-        t = EmptyOperator(task_id=option)
 
-        empty_follow = EmptyOperator(task_id="follow_" + option)
+        @task(task_id=f"branch_{option}")
+        def some_task():
+            print("doing something in Python")
+
+        t = some_task()
+        empty = EmptyOperator(task_id=f"follow_{option}")
+
+        # Label is optional here, but it can help identify more complex branches
+        random_choice_instance >> Label(option) >> t >> empty >> join
+
+    # Example the same with external Python calls
+
+    # [START howto_operator_branch_ext_py]
+    @task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
+    def branching_ext_python(choices) -> str:
+        import random
+
+        return f"ext_py_{random.choice(choices)}"
+
+    # [END howto_operator_branch_ext_py]
+
+    random_choice_ext_py = branching_ext_python(choices=options)
+
+    join >> random_choice_ext_py
+
+    join_ext_py = EmptyOperator(task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+
+    for option in options:
+
+        @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY)
+        def some_ext_py_task():
+            print("doing something in external Python")
+
+        t = some_ext_py_task()
+
+        # Label is optional here, but it can help identify more complex branches
+        random_choice_ext_py >> Label(option) >> t >> join_ext_py
+
+    # Example the same with Python virtual environments
+
+    # [START howto_operator_branch_virtualenv]
+    # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
+    #       Run the example a second time and see that it re-uses it and is faster.
+    VENV_CACHE_PATH = tempfile.gettempdir()
+
+    @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH)
+    def branching_virtualenv(choices) -> str:
+        import random
+
+        import numpy as np
+
+        print(f"Some numpy stuff: {np.arange(6)}")
+        return f"venv_{random.choice(choices)}"
+
+    # [END howto_operator_branch_virtualenv]
+
+    random_choice_venv = branching_virtualenv(choices=options)
+
+    join_ext_py >> random_choice_venv
+
+    join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+
+    for option in options:
+
+        @task.virtualenv(
+            task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH
+        )
+        def some_venv_task():
+            import numpy as np
+
+            print(f"Some numpy stuff: {np.arange(6)}")
+
+        t = some_venv_task()
 
         # Label is optional here, but it can help identify more complex branches
-        random_choice_instance >> Label(option) >> t >> empty_follow >> join
+        random_choice_venv >> Label(option) >> t >> join_venv
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 118d1d4bdb..0eadb5441a 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -729,6 +729,10 @@ class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn):
     these paths can't move forward. The ``skipped`` states are propagated
     downstream to allow for the DAG state to fill up and the DAG run's state
     to be inferred.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BranchPythonVirtualenvOperator`
     """
 
     def execute(self, context: Context) -> Any:
@@ -910,6 +914,10 @@ class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn):
     Extends ExternalPythonOperator, so expects to get Python:
     virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path,
     so it can run on separate virtual environment similarly to ExternalPythonOperator.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BranchExternalPythonOperator`
     """
 
     def execute(self, context: Context) -> Any:
diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst
index 8ca2689320..86f4aaad2e 100644
--- a/docs/apache-airflow/core-concepts/dags.rst
+++ b/docs/apache-airflow/core-concepts/dags.rst
@@ -371,6 +371,8 @@ As with the callable for ``@task.branch``, this method can return the ID of a do
             else:
                 return None
 
+Similar like ``@task.branch`` decorator for regular Python code there are also branch decorators which use a virtual environment called ``@task.branch_virtualenv`` or external python called ``@task.branch_external_python``.
+
 
 .. _concepts:latest-only:
 
diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst
index 6adbbe8b9e..6cfb5a335d 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -103,7 +103,15 @@ Otherwise you won't have access to the most context variables of Airflow in ``op
 If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and
 ``lazy_object_proxy``.
 
-If additional parameters for package installation are needed pass them in ``requirements.txt`` as in the example below:
+.. warning::
+    The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code.
+    As in the examples you need to add all imports again and you can not rely on variables from the global Python context.
+
+    If you want to pass variables into the classic :class:`~airflow.operators.python.PythonVirtualenvOperator` use
+    ``op_args`` and ``op_kwargs``.
+
+If additional parameters for package installation are needed pass them in via the ``pip_install_options`` parameter or use a
+``requirements.txt`` as in the example below:
 
 .. code-block::
 
@@ -196,6 +204,102 @@ Otherwise you won't have access to the most context variables of Airflow in ``op
 If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and
 ``lazy_object_proxy`` to your virtual environment.
 
+.. warning::
+    The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code.
+    As in the examples you need to add all imports again and you can not rely on variables from the global Python context.
+
+    If you want to pass variables into the classic :class:`~airflow.operators.python.ExternalPythonOperator` use
+    ``op_args`` and ``op_kwargs``.
+
+.. _howto/operator:PythonBranchOperator:
+
+PythonBranchOperator
+====================
+
+Use the ``@task.branch`` decorator to execute Python :ref:`branching <concepts:branching>` tasks.
+
+.. warning::
+    The ``@task.branch`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonBranchOperator`
+    to execute Python code.
+
+TaskFlow example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_branch_python]
+    :end-before: [END howto_operator_branch_python]
+
+Classic example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_branch_python]
+    :end-before: [END howto_operator_branch_python]
+
+Argument passing and templating options are the same like with :ref:`howto/operator:PythonOperator`.
+
+.. _howto/operator:BranchPythonVirtualenvOperator:
+
+BranchPythonVirtualenvOperator
+==============================
+
+Use the ``@task.branch_virtualenv`` decorator to execute Python :ref:`branching <concepts:branching>` tasks and is a hybrid of
+the branch decorator with execution in a virtual environment.
+
+.. warning::
+    The ``@task.branch_virtualenv`` decorator is recommended over the classic
+    :class:`~airflow.operators.python.BranchPythonVirtualenvOperator` to execute Python code.
+
+TaskFlow example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_branch_virtualenv]
+    :end-before: [END howto_operator_branch_virtualenv]
+
+Classic example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_branch_virtualenv]
+    :end-before: [END howto_operator_branch_virtualenv]
+
+Argument passing and templating options are the same like with :ref:`howto/operator:PythonVirtualenvOperator`.
+
+.. _howto/operator:BranchExternalPythonOperator:
+
+BranchExternalPythonOperator
+============================
+
+Use the ``@task.branch_external_python`` decorator to execute Python :ref:`branching <concepts:branching>` tasks and is a hybrid of
+the branch decorator with execution in an external Python environment.
+
+.. warning::
+    The ``@task.branch_external_python`` decorator is recommended over the classic
+    :class:`~airflow.operators.python.BranchExternalPythonOperator` to execute Python code.
+
+TaskFlow example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_branch_ext_py]
+    :end-before: [END howto_operator_branch_ext_py]
+
+Classic example of using the operator:
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_branch_ext_py]
+    :end-before: [END howto_operator_branch_ext_py]
+
+Argument passing and templating options are the same like with :ref:`howto/operator:ExternalPythonOperator`.
+
 .. _howto/operator:ShortCircuitOperator:
 
 ShortCircuitOperator
diff --git a/tests/decorators/test_branch_external_python.py b/tests/decorators/test_branch_external_python.py
new file mode 100644
index 0000000000..01e1f9b968
--- /dev/null
+++ b/tests/decorators/test_branch_external_python.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+
+import pytest
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+    @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"])
+    def test_branch_one(self, dag_maker, branch_task_name):
+        @task
+        def dummy_f():
+            pass
+
+        @task
+        def task_1():
+            pass
+
+        @task
+        def task_2():
+            pass
+
+        if (
+            branch_task_name == "task_1"
+        ):  # Note we manually need to carry the literal value into the venc code :-(
+
+            @task.branch_external_python(task_id="branching", python=sys.executable)
+            def branch_operator():
+                return "task_1"
+
+        else:
+
+            @task.branch_external_python(task_id="branching", python=sys.executable)
+            def branch_operator():
+                return "task_2"
+
+        with dag_maker():
+            branchoperator = branch_operator()
+            df = dummy_f()
+            task_1 = task_1()
+            task_2 = task_2()
+
+            df.set_downstream(branchoperator)
+            branchoperator.set_downstream(task_1)
+            branchoperator.set_downstream(task_2)
+
+        dr = dag_maker.create_dagrun()
+        df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        branchoperator.operator.run(
+            start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True
+        )
+        task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        tis = dr.get_task_instances()
+
+        for ti in tis:
+            if ti.task_id == "dummy_f":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "branching":
+                assert ti.state == State.SUCCESS
+
+            if ti.task_id == "task_1" and branch_task_name == "task_1":
+                assert ti.state == State.SUCCESS
+            elif ti.task_id == "task_1":
+                assert ti.state == State.SKIPPED
+
+            if ti.task_id == "task_2" and branch_task_name == "task_2":
+                assert ti.state == State.SUCCESS
+            elif ti.task_id == "task_2":
+                assert ti.state == State.SKIPPED
diff --git a/tests/decorators/test_branch_virtualenv.py b/tests/decorators/test_branch_virtualenv.py
new file mode 100644
index 0000000000..861ba154af
--- /dev/null
+++ b/tests/decorators/test_branch_virtualenv.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.decorators import task
+from airflow.utils.state import State
+
+
+class Test_BranchPythonDecoratedOperator:
+    @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"])
+    def test_branch_one(self, dag_maker, branch_task_name):
+        @task
+        def dummy_f():
+            pass
+
+        @task
+        def task_1():
+            pass
+
+        @task
+        def task_2():
+            pass
+
+        if (
+            branch_task_name == "task_1"
+        ):  # Note we manually need to carry the literal value into the venc code :-(
+
+            @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"])
+            def branch_operator():
+                import funcsigs
+
+                print(f"We successfully imported funcsigs version {funcsigs.__version__}")
+                return "task_1"
+
+        else:
+
+            @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"])
+            def branch_operator():
+                import funcsigs
+
+                print(f"We successfully imported funcsigs version {funcsigs.__version__}")
+                return "task_2"
+
+        with dag_maker():
+            branchoperator = branch_operator()
+            df = dummy_f()
+            task_1 = task_1()
+            task_2 = task_2()
+
+            df.set_downstream(branchoperator)
+            branchoperator.set_downstream(task_1)
+            branchoperator.set_downstream(task_2)
+
+        dr = dag_maker.create_dagrun()
+        df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        branchoperator.operator.run(
+            start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True
+        )
+        task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True)
+        tis = dr.get_task_instances()
+
+        for ti in tis:
+            if ti.task_id == "dummy_f":
+                assert ti.state == State.SUCCESS
+            if ti.task_id == "branching":
+                assert ti.state == State.SUCCESS
+
+            if ti.task_id == "task_1" and branch_task_name == "task_1":
+                assert ti.state == State.SUCCESS
+            elif ti.task_id == "task_1":
+                assert ti.state == State.SKIPPED
+
+            if ti.task_id == "task_2" and branch_task_name == "task_2":
+                assert ti.state == State.SUCCESS
+            elif ti.task_id == "task_2":
+                assert ti.state == State.SKIPPED
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 8d592dfe65..195a959ee9 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -244,11 +244,23 @@ class TestBackfillJob:
                     "branch_b",
                     "branch_c",
                     "branch_d",
-                    "follow_branch_a",
-                    "follow_branch_b",
-                    "follow_branch_c",
-                    "follow_branch_d",
+                    "follow_a",
+                    "follow_b",
+                    "follow_c",
+                    "follow_d",
                     "join",
+                    "branching_ext_python",
+                    "ext_py_a",
+                    "ext_py_b",
+                    "ext_py_c",
+                    "ext_py_d",
+                    "join_ext_python",
+                    "branching_venv",
+                    "venv_a",
+                    "venv_b",
+                    "venv_c",
+                    "venv_d",
+                    "join_venv",
                 ),
             ],
             [