You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/02 15:54:18 UTC
[airflow] branch main updated: Update core example DAGs to use `@task.branch` decorator (#25242)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 472f4e2906 Update core example DAGs to use `@task.branch` decorator (#25242)
472f4e2906 is described below
commit 472f4e290678ccc5762b12b84613ba7ce839e9f4
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Tue Aug 2 11:54:07 2022 -0400
Update core example DAGs to use `@task.branch` decorator (#25242)
* Update core example DAGs to use `@task.branch` decorator
* fixup! Update core example DAGs to use `@task.branch` decorator
fixup! fixup! Update core example DAGs to use `@task.branch` decorator
* fixup! fixup! Update core example DAGs to use `@task.branch` decorator
* Update `override` return type anno + use DAG context manager
* Update dedent for dataset docs
* Use `__future__.annotations` where applicable
* fixup! Use `__future__.annotations` where applicable
---
airflow/decorators/base.py | 6 +++
.../example_branch_operator_decorator.py | 32 ++++++--------
.../example_branch_python_dop_operator_3.py | 12 +++---
airflow/example_dags/example_datasets.py | 49 ++++++++++------------
airflow/example_dags/example_nested_branch_dag.py | 14 +++++--
airflow/providers_manager.py | 6 ++-
docs/apache-airflow/concepts/datasets.rst | 1 +
7 files changed, 61 insertions(+), 59 deletions(-)
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 49b8f055dc..448b7e449d 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -533,6 +533,9 @@ class Task(Generic[FParams, FReturn]):
def expand_kwargs(self, kwargs: XComArg, *, strict: bool = True) -> XComArg:
...
+ def override(self, **kwargs: Any) -> "Task[FParams, FReturn]":
+ ...
+
class TaskDecorator(Protocol):
"""Type declaration for ``task_decorator_factory`` return type."""
@@ -553,6 +556,9 @@ class TaskDecorator(Protocol):
) -> Callable[[Callable[FParams, FReturn]], Task[FParams, FReturn]]:
"""For the decorator factory ``@task()`` case."""
+ def override(self, **kwargs: Any) -> "Task[FParams, FReturn]":
+ ...
+
def task_decorator_factory(
python_callable: Optional[Callable] = None,
diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py
index 0ab4f76caf..864029349c 100644
--- a/airflow/example_dags/example_branch_operator_decorator.py
+++ b/airflow/example_dags/example_branch_operator_decorator.py
@@ -16,10 +16,13 @@
# specific language governing permissions and limitations
# under the License.
-"""Example DAG demonstrating the usage of the BranchPythonOperator."""
+"""Example DAG demonstrating the usage of the ``@task.branch`` TaskFlow API decorator."""
+
+from __future__ import annotations
import random
-from datetime import datetime
+
+import pendulum
from airflow import DAG
from airflow.decorators import task
@@ -29,38 +32,29 @@ from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id='example_branch_python_operator_decorator',
- start_date=datetime(2021, 1, 1),
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
tags=['example', 'example2'],
) as dag:
- run_this_first = EmptyOperator(
- task_id='run_this_first',
- )
+ run_this_first = EmptyOperator(task_id='run_this_first')
options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
@task.branch(task_id="branching")
- def random_choice():
- return random.choice(options)
+ def random_choice(choices: list[str]) -> str:
+ return random.choice(choices)
- random_choice_instance = random_choice()
+ random_choice_instance = random_choice(choices=options)
run_this_first >> random_choice_instance
- join = EmptyOperator(
- task_id='join',
- trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
- )
+ join = EmptyOperator(task_id='join', trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
for option in options:
- t = EmptyOperator(
- task_id=option,
- )
+ t = EmptyOperator(task_id=option)
- empty_follow = EmptyOperator(
- task_id='follow_' + option,
- )
+ empty_follow = EmptyOperator(task_id='follow_' + option)
# Label is optional here, but it can help identify more complex branches
random_choice_instance >> Label(option) >> t >> empty_follow >> join
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index a8e0ce2c1c..acb957f72e 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -17,16 +17,17 @@
# under the License.
"""
-Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
-or skipped on alternating runs.
+Example DAG demonstrating the usage of ``@task.branch`` TaskFlow API decorator with depends_on_past=True,
+where tasks may be run or skipped on alternating runs.
"""
import pendulum
from airflow import DAG
+from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import BranchPythonOperator
+@task.branch()
def should_run(**kwargs):
"""
Determine which empty_task should be run based on if the execution date minute is even or odd.
@@ -52,10 +53,7 @@ with DAG(
default_args={'depends_on_past': True},
tags=['example'],
) as dag:
- cond = BranchPythonOperator(
- task_id='condition',
- python_callable=should_run,
- )
+ cond = should_run()
empty_task_1 = EmptyOperator(task_id='empty_task_1')
empty_task_2 = EmptyOperator(task_id='empty_task_2')
diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index 2068d66a5c..41351bad96 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -36,7 +36,7 @@ example_dataset_dag4_req_dag1_dag2 should run.
Dags example_dataset_dag5_req_dag1_D and example_dataset_dag6_req_DD should not run because they depend on
datasets that never get updated.
"""
-from datetime import datetime
+import pendulum
from airflow.models import DAG, Dataset
from airflow.operators.bash import BashOperator
@@ -46,52 +46,45 @@ dag1_dataset = Dataset('s3://dag1/output_1.txt', extra={'hi': 'bye'})
# [END dataset_def]
dag2_dataset = Dataset('s3://dag2/output_1.txt', extra={'hi': 'bye'})
-dag1 = DAG(
+with DAG(
dag_id='example_dataset_dag1',
catchup=False,
- start_date=datetime(2020, 1, 1),
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval='@daily',
tags=['upstream'],
-)
-
-# [START task_outlet]
-BashOperator(outlets=[dag1_dataset], task_id='upstream_task_1', bash_command="sleep 5", dag=dag1)
-# [END task_outlet]
+) as dag1:
+ # [START task_outlet]
+ BashOperator(outlets=[dag1_dataset], task_id='upstream_task_1', bash_command="sleep 5")
+ # [END task_outlet]
with DAG(
dag_id='example_dataset_dag2',
catchup=False,
- start_date=datetime(2020, 1, 1),
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval=None,
tags=['upstream'],
) as dag2:
- BashOperator(
- outlets=[dag2_dataset],
- task_id='upstream_task_2',
- bash_command="sleep 5",
- )
+ BashOperator(outlets=[dag2_dataset], task_id='upstream_task_2', bash_command="sleep 5")
# [START dag_dep]
-dag3 = DAG(
+with DAG(
dag_id='example_dataset_dag3_req_dag1',
catchup=False,
- start_date=datetime(2020, 1, 1),
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[dag1_dataset],
tags=['downstream'],
-)
-# [END dag_dep]
-
-BashOperator(
- outlets=[Dataset('s3://downstream_1_task/dataset_other.txt')],
- task_id='downstream_1',
- bash_command="sleep 5",
- dag=dag3,
-)
+) as dag3:
+ # [END dag_dep]
+ BashOperator(
+ outlets=[Dataset('s3://downstream_1_task/dataset_other.txt')],
+ task_id='downstream_1',
+ bash_command="sleep 5",
+ )
with DAG(
dag_id='example_dataset_dag4_req_dag1_dag2',
catchup=False,
- start_date=datetime(2020, 1, 1),
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[dag1_dataset, dag2_dataset],
tags=['downstream'],
) as dag4:
@@ -104,7 +97,7 @@ with DAG(
with DAG(
dag_id='example_dataset_dag5_req_dag1_D',
catchup=False,
- start_date=datetime(2020, 1, 1),
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[
dag1_dataset,
Dataset('s3://this-dataset-doesnt-get-triggered'),
@@ -120,7 +113,7 @@ with DAG(
with DAG(
dag_id='example_dataset_dag6_req_DD',
catchup=False,
- start_date=datetime(2020, 1, 1),
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[
Dataset('s3://unrelated/dataset3.txt'),
Dataset('s3://unrelated/dataset_other_unknown.txt'),
diff --git a/airflow/example_dags/example_nested_branch_dag.py b/airflow/example_dags/example_nested_branch_dag.py
index 14ac0a43ce..1eb41d3fda 100644
--- a/airflow/example_dags/example_nested_branch_dag.py
+++ b/airflow/example_dags/example_nested_branch_dag.py
@@ -19,13 +19,13 @@
"""
Example DAG demonstrating a workflow with nested branching. The join tasks are created with
``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding
-``BranchPythonOperator`` are skipped.
+branching tasks are skipped.
"""
import pendulum
+from airflow.decorators import task
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
@@ -35,11 +35,17 @@ with DAG(
schedule_interval="@daily",
tags=["example"],
) as dag:
- branch_1 = BranchPythonOperator(task_id="branch_1", python_callable=lambda: "true_1")
+
+ @task.branch()
+ def branch(task_id_to_return: str) -> str:
+ return task_id_to_return
+
+ branch_1 = branch.override(task_id="branch_1")(task_id_to_return="true_1")
join_1 = EmptyOperator(task_id="join_1", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
true_1 = EmptyOperator(task_id="true_1")
false_1 = EmptyOperator(task_id="false_1")
- branch_2 = BranchPythonOperator(task_id="branch_2", python_callable=lambda: "true_2")
+
+ branch_2 = branch.override(task_id="branch_2")(task_id_to_return="true_2")
join_2 = EmptyOperator(task_id="join_2", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
true_2 = EmptyOperator(task_id="true_2")
false_2 = EmptyOperator(task_id="false_2")
diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index eb7bd9676a..aca6d98c0e 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -28,6 +28,7 @@ from dataclasses import dataclass
from functools import wraps
from time import perf_counter
from typing import (
+ TYPE_CHECKING,
Any,
Callable,
Dict,
@@ -64,6 +65,9 @@ MIN_PROVIDER_VERSIONS = {
"apache-airflow-providers-celery": "2.1.0",
}
+if TYPE_CHECKING:
+ from airflow.decorators.base import TaskDecorator
+
class LazyDictWithCache(MutableMapping):
"""
@@ -894,7 +898,7 @@ class ProvidersManager(LoggingMixin):
return self._hooks_lazy_dict
@property
- def taskflow_decorators(self) -> Dict[str, Callable]:
+ def taskflow_decorators(self) -> Dict[str, "TaskDecorator"]:
self.initialize_providers_taskflow_decorator()
return self._taskflow_decorators
diff --git a/docs/apache-airflow/concepts/datasets.rst b/docs/apache-airflow/concepts/datasets.rst
index 211349db56..74735e36ee 100644
--- a/docs/apache-airflow/concepts/datasets.rst
+++ b/docs/apache-airflow/concepts/datasets.rst
@@ -33,6 +33,7 @@ Then reference the dataset as a task outlet:
.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
:language: python
+ :dedent: 4
:start-after: [START task_outlet]
:end-before: [END task_outlet]