You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/02 15:54:18 UTC

[airflow] branch main updated: Update core example DAGs to use `@task.branch` decorator (#25242)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 472f4e2906 Update core example DAGs to use `@task.branch` decorator (#25242)
472f4e2906 is described below

commit 472f4e290678ccc5762b12b84613ba7ce839e9f4
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Tue Aug 2 11:54:07 2022 -0400

    Update core example DAGs to use `@task.branch` decorator (#25242)
    
    * Update core example DAGs to use `@task.branch` decorator
    
    * fixup! Update core example DAGs to use `@task.branch` decorator
    
    fixup! fixup! Update core example DAGs to use `@task.branch` decorator
    
    * fixup! fixup! Update core example DAGs to use `@task.branch` decorator
    
    * Update `override` return type anno + use DAG context manager
    
    * Update dedent for dataset docs
    
    * Use `__future__.annotations` where applicable
    
    * fixup! Use `__future__.annotations` where applicable
---
 airflow/decorators/base.py                         |  6 +++
 .../example_branch_operator_decorator.py           | 32 ++++++--------
 .../example_branch_python_dop_operator_3.py        | 12 +++---
 airflow/example_dags/example_datasets.py           | 49 ++++++++++------------
 airflow/example_dags/example_nested_branch_dag.py  | 14 +++++--
 airflow/providers_manager.py                       |  6 ++-
 docs/apache-airflow/concepts/datasets.rst          |  1 +
 7 files changed, 61 insertions(+), 59 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 49b8f055dc..448b7e449d 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -533,6 +533,9 @@ class Task(Generic[FParams, FReturn]):
     def expand_kwargs(self, kwargs: XComArg, *, strict: bool = True) -> XComArg:
         ...
 
+    def override(self, **kwargs: Any) -> "Task[FParams, FReturn]":
+        ...
+
 
 class TaskDecorator(Protocol):
     """Type declaration for ``task_decorator_factory`` return type."""
@@ -553,6 +556,9 @@ class TaskDecorator(Protocol):
     ) -> Callable[[Callable[FParams, FReturn]], Task[FParams, FReturn]]:
         """For the decorator factory ``@task()`` case."""
 
+    def override(self, **kwargs: Any) -> "Task[FParams, FReturn]":
+        ...
+
 
 def task_decorator_factory(
     python_callable: Optional[Callable] = None,
diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py
index 0ab4f76caf..864029349c 100644
--- a/airflow/example_dags/example_branch_operator_decorator.py
+++ b/airflow/example_dags/example_branch_operator_decorator.py
@@ -16,10 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Example DAG demonstrating the usage of the BranchPythonOperator."""
+"""Example DAG demonstrating the usage of the ``@task.branch`` TaskFlow API decorator."""
+
+from __future__ import annotations
 
 import random
-from datetime import datetime
+
+import pendulum
 
 from airflow import DAG
 from airflow.decorators import task
@@ -29,38 +32,29 @@ from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
     dag_id='example_branch_python_operator_decorator',
-    start_date=datetime(2021, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     catchup=False,
     schedule_interval="@daily",
     tags=['example', 'example2'],
 ) as dag:
-    run_this_first = EmptyOperator(
-        task_id='run_this_first',
-    )
+    run_this_first = EmptyOperator(task_id='run_this_first')
 
     options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
 
     @task.branch(task_id="branching")
-    def random_choice():
-        return random.choice(options)
+    def random_choice(choices: list[str]) -> str:
+        return random.choice(choices)
 
-    random_choice_instance = random_choice()
+    random_choice_instance = random_choice(choices=options)
 
     run_this_first >> random_choice_instance
 
-    join = EmptyOperator(
-        task_id='join',
-        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
-    )
+    join = EmptyOperator(task_id='join', trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
 
     for option in options:
-        t = EmptyOperator(
-            task_id=option,
-        )
+        t = EmptyOperator(task_id=option)
 
-        empty_follow = EmptyOperator(
-            task_id='follow_' + option,
-        )
+        empty_follow = EmptyOperator(task_id='follow_' + option)
 
         # Label is optional here, but it can help identify more complex branches
         random_choice_instance >> Label(option) >> t >> empty_follow >> join
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index a8e0ce2c1c..acb957f72e 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -17,16 +17,17 @@
 # under the License.
 
 """
-Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
-or skipped on alternating runs.
+Example DAG demonstrating the usage of ``@task.branch`` TaskFlow API decorator with depends_on_past=True,
+where tasks may be run or skipped on alternating runs.
 """
 import pendulum
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import BranchPythonOperator
 
 
+@task.branch()
 def should_run(**kwargs):
     """
     Determine which empty_task should be run based on if the execution date minute is even or odd.
@@ -52,10 +53,7 @@ with DAG(
     default_args={'depends_on_past': True},
     tags=['example'],
 ) as dag:
-    cond = BranchPythonOperator(
-        task_id='condition',
-        python_callable=should_run,
-    )
+    cond = should_run()
 
     empty_task_1 = EmptyOperator(task_id='empty_task_1')
     empty_task_2 = EmptyOperator(task_id='empty_task_2')
diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index 2068d66a5c..41351bad96 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -36,7 +36,7 @@ example_dataset_dag4_req_dag1_dag2 should run.
 Dags example_dataset_dag5_req_dag1_D and example_dataset_dag6_req_DD should not run because they depend on
 datasets that never get updated.
 """
-from datetime import datetime
+import pendulum
 
 from airflow.models import DAG, Dataset
 from airflow.operators.bash import BashOperator
@@ -46,52 +46,45 @@ dag1_dataset = Dataset('s3://dag1/output_1.txt', extra={'hi': 'bye'})
 # [END dataset_def]
 dag2_dataset = Dataset('s3://dag2/output_1.txt', extra={'hi': 'bye'})
 
-dag1 = DAG(
+with DAG(
     dag_id='example_dataset_dag1',
     catchup=False,
-    start_date=datetime(2020, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule_interval='@daily',
     tags=['upstream'],
-)
-
-# [START task_outlet]
-BashOperator(outlets=[dag1_dataset], task_id='upstream_task_1', bash_command="sleep 5", dag=dag1)
-# [END task_outlet]
+) as dag1:
+    # [START task_outlet]
+    BashOperator(outlets=[dag1_dataset], task_id='upstream_task_1', bash_command="sleep 5")
+    # [END task_outlet]
 
 with DAG(
     dag_id='example_dataset_dag2',
     catchup=False,
-    start_date=datetime(2020, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule_interval=None,
     tags=['upstream'],
 ) as dag2:
-    BashOperator(
-        outlets=[dag2_dataset],
-        task_id='upstream_task_2',
-        bash_command="sleep 5",
-    )
+    BashOperator(outlets=[dag2_dataset], task_id='upstream_task_2', bash_command="sleep 5")
 
 # [START dag_dep]
-dag3 = DAG(
+with DAG(
     dag_id='example_dataset_dag3_req_dag1',
     catchup=False,
-    start_date=datetime(2020, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule_on=[dag1_dataset],
     tags=['downstream'],
-)
-# [END dag_dep]
-
-BashOperator(
-    outlets=[Dataset('s3://downstream_1_task/dataset_other.txt')],
-    task_id='downstream_1',
-    bash_command="sleep 5",
-    dag=dag3,
-)
+) as dag3:
+    # [END dag_dep]
+    BashOperator(
+        outlets=[Dataset('s3://downstream_1_task/dataset_other.txt')],
+        task_id='downstream_1',
+        bash_command="sleep 5",
+    )
 
 with DAG(
     dag_id='example_dataset_dag4_req_dag1_dag2',
     catchup=False,
-    start_date=datetime(2020, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule_on=[dag1_dataset, dag2_dataset],
     tags=['downstream'],
 ) as dag4:
@@ -104,7 +97,7 @@ with DAG(
 with DAG(
     dag_id='example_dataset_dag5_req_dag1_D',
     catchup=False,
-    start_date=datetime(2020, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule_on=[
         dag1_dataset,
         Dataset('s3://this-dataset-doesnt-get-triggered'),
@@ -120,7 +113,7 @@ with DAG(
 with DAG(
     dag_id='example_dataset_dag6_req_DD',
     catchup=False,
-    start_date=datetime(2020, 1, 1),
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule_on=[
         Dataset('s3://unrelated/dataset3.txt'),
         Dataset('s3://unrelated/dataset_other_unknown.txt'),
diff --git a/airflow/example_dags/example_nested_branch_dag.py b/airflow/example_dags/example_nested_branch_dag.py
index 14ac0a43ce..1eb41d3fda 100644
--- a/airflow/example_dags/example_nested_branch_dag.py
+++ b/airflow/example_dags/example_nested_branch_dag.py
@@ -19,13 +19,13 @@
 """
 Example DAG demonstrating a workflow with nested branching. The join tasks are created with
 ``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding
-``BranchPythonOperator`` are skipped.
+branching tasks are skipped.
 """
 import pendulum
 
+from airflow.decorators import task
 from airflow.models import DAG
 from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import BranchPythonOperator
 from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
@@ -35,11 +35,17 @@ with DAG(
     schedule_interval="@daily",
     tags=["example"],
 ) as dag:
-    branch_1 = BranchPythonOperator(task_id="branch_1", python_callable=lambda: "true_1")
+
+    @task.branch()
+    def branch(task_id_to_return: str) -> str:
+        return task_id_to_return
+
+    branch_1 = branch.override(task_id="branch_1")(task_id_to_return="true_1")
     join_1 = EmptyOperator(task_id="join_1", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
     true_1 = EmptyOperator(task_id="true_1")
     false_1 = EmptyOperator(task_id="false_1")
-    branch_2 = BranchPythonOperator(task_id="branch_2", python_callable=lambda: "true_2")
+
+    branch_2 = branch.override(task_id="branch_2")(task_id_to_return="true_2")
     join_2 = EmptyOperator(task_id="join_2", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
     true_2 = EmptyOperator(task_id="true_2")
     false_2 = EmptyOperator(task_id="false_2")
diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index eb7bd9676a..aca6d98c0e 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -28,6 +28,7 @@ from dataclasses import dataclass
 from functools import wraps
 from time import perf_counter
 from typing import (
+    TYPE_CHECKING,
     Any,
     Callable,
     Dict,
@@ -64,6 +65,9 @@ MIN_PROVIDER_VERSIONS = {
     "apache-airflow-providers-celery": "2.1.0",
 }
 
+if TYPE_CHECKING:
+    from airflow.decorators.base import TaskDecorator
+
 
 class LazyDictWithCache(MutableMapping):
     """
@@ -894,7 +898,7 @@ class ProvidersManager(LoggingMixin):
         return self._hooks_lazy_dict
 
     @property
-    def taskflow_decorators(self) -> Dict[str, Callable]:
+    def taskflow_decorators(self) -> Dict[str, "TaskDecorator"]:
         self.initialize_providers_taskflow_decorator()
         return self._taskflow_decorators
 
diff --git a/docs/apache-airflow/concepts/datasets.rst b/docs/apache-airflow/concepts/datasets.rst
index 211349db56..74735e36ee 100644
--- a/docs/apache-airflow/concepts/datasets.rst
+++ b/docs/apache-airflow/concepts/datasets.rst
@@ -33,6 +33,7 @@ Then reference the dataset as a task outlet:
 
 .. exampleinclude:: /../../airflow/example_dags/example_datasets.py
     :language: python
+    :dedent: 4
     :start-after: [START task_outlet]
     :end-before: [END task_outlet]