You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/23 03:19:22 UTC
[airflow] 01/13: changing to task decorator in docs from classic operator use (#25711)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8a7ee215229316abf48eac5cee3b05b556ad3fed
Author: Bowrna <ma...@gmail.com>
AuthorDate: Fri Sep 16 16:29:21 2022 +0530
changing to task decorator in docs from classic operator use (#25711)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 3d7673346f005732e62ef64db2c9d0089852d67f)
---
docs/apache-airflow/concepts/dags.rst | 32 ++++++++++++++++++------------
docs/apache-airflow/concepts/operators.rst | 12 +++++++++--
2 files changed, 29 insertions(+), 15 deletions(-)
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 4e52c1532a..541a2adad5 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -269,11 +269,11 @@ By default, a DAG will only run a Task when all the Tasks it depends on are succ
Branching
~~~~~~~~~
-You can make use of branching in order to tell the DAG *not* to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the branching Operators come in.
+You can make use of branching in order to tell the DAG *not* to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the ``@task.branch`` decorator come in.
-The ``BranchPythonOperator`` is much like the PythonOperator except that it expects a ``python_callable`` that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped. It can also return None to skip all downstream task.
+The ``@task.branch`` decorator is much like ``@task``, except that it expects the decorated function to return an ID to a task (or a list of IDs). The specified task is followed, while all other paths are skipped. It can also return *None* to skip all downstream tasks.
-The task_id returned by the Python function has to reference a task directly downstream from the BranchPythonOperator task.
+The task_id returned by the Python function has to reference a task directly downstream from the ``@task.branch`` decorated task.
.. note::
When a Task is downstream of both the branching operator *and* downstream of one or more of the selected tasks, it will not be skipped:
@@ -282,10 +282,11 @@ The task_id returned by the Python function has to reference a task directly dow
The paths of the branching task are ``branch_a``, ``join`` and ``branch_b``. Since ``join`` is a downstream task of ``branch_a``, it will still be run, even though it was not returned as part of the branch decision.
-The ``BranchPythonOperator`` can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example:
+The ``@task.branch`` can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example:
.. code-block:: python
+ @task.branch(task_id="branch_task")
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
@@ -303,20 +304,19 @@ The ``BranchPythonOperator`` can also be used with XComs allowing branching cont
dag=dag,
)
- branch_op = BranchPythonOperator(
- task_id="branch_task",
- python_callable=branch_func,
- dag=dag,
- )
+ branch_op = branch_func()
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
-If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``BranchPythonOperator`` but expects you to provide an implementation of the method ``choose_branch``.
+If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``.
+
+.. note::
+ The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator.
-As with the callable for ``BranchPythonOperator``, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task::
+As with the callable for ``@task.branch``, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task::
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
@@ -404,7 +404,6 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
- from airflow.operators.python import BranchPythonOperator
dag = DAG(
dag_id="branch_without_trigger",
@@ -413,7 +412,14 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
)
run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
- branching = BranchPythonOperator(task_id="branching", dag=dag, python_callable=lambda: "branch_a")
+
+
+ @task.branch(task_id="branching")
+ def do_branching():
+ return "branch_a"
+
+
+ branching = do_branching()
branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)
diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst
index 944ff64a52..cb278402de 100644
--- a/docs/apache-airflow/concepts/operators.rst
+++ b/docs/apache-airflow/concepts/operators.rst
@@ -31,6 +31,11 @@ Airflow has a very extensive set of operators available, with some built-in to t
- :class:`~airflow.operators.bash.BashOperator` - executes a bash command
- :class:`~airflow.operators.python.PythonOperator` - calls an arbitrary Python function
- :class:`~airflow.operators.email.EmailOperator` - sends an email
+- Use the ``@task`` decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments.
+
+.. note::
+ The ``@task`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonOperator`
+ to execute Python callables with no template rendering in its arguments.
For a list of all core operators, see: :doc:`Core Operators and Hooks Reference </operators-and-hooks-ref>`.
@@ -103,6 +108,7 @@ You can also use Jinja templating with nested fields, as long as these nested fi
dag=dag,
)
+
.. note:: The ``template_fields`` property can equally be a class variable or an instance variable.
Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields:
@@ -134,6 +140,7 @@ Deep nested fields can also be substituted, as long as all intermediate fields a
dag=dag,
)
+
You can pass custom options to the Jinja ``Environment`` when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string:
.. code-block:: python
@@ -168,7 +175,6 @@ Now, when the following task is run, ``order_data`` argument is passed a string,
python_callable=transform,
)
-
If you instead want the rendered template field to return a Native Python object (``dict`` in our example),
you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
@@ -183,11 +189,13 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
)
+ @task(task_id="extract")
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
+ @task(task_id="transform")
def transform(order_data):
print(type(order_data))
for value in order_data.values():
@@ -195,7 +203,7 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
return {"total_order_value": total_order_value}
- extract_task = PythonOperator(task_id="extract", python_callable=extract)
+ extract_task = extract()
transform_task = PythonOperator(
task_id="transform",