You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/23 03:19:22 UTC

[airflow] 01/13: changing to task decorator in docs from classic operator use (#25711)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8a7ee215229316abf48eac5cee3b05b556ad3fed
Author: Bowrna <ma...@gmail.com>
AuthorDate: Fri Sep 16 16:29:21 2022 +0530

    changing to task decorator in docs from classic operator use (#25711)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 3d7673346f005732e62ef64db2c9d0089852d67f)
---
 docs/apache-airflow/concepts/dags.rst      | 32 ++++++++++++++++++------------
 docs/apache-airflow/concepts/operators.rst | 12 +++++++++--
 2 files changed, 29 insertions(+), 15 deletions(-)

diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 4e52c1532a..541a2adad5 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -269,11 +269,11 @@ By default, a DAG will only run a Task when all the Tasks it depends on are succ
 Branching
 ~~~~~~~~~
 
-You can make use of branching in order to tell the DAG *not* to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the branching Operators come in.
+You can make use of branching in order to tell the DAG *not* to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the ``@task.branch`` decorator come in.
 
-The ``BranchPythonOperator`` is much like the PythonOperator except that it expects a ``python_callable`` that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped. It can also return None to skip all downstream task.
+The ``@task.branch`` decorator is much like ``@task``, except that it expects the decorated function to return an ID to a task (or a list of IDs). The specified task is followed, while all other paths are skipped. It can also return *None* to skip all downstream tasks.
 
-The task_id returned by the Python function has to reference a task directly downstream from the BranchPythonOperator task.
+The task_id returned by the Python function has to reference a task directly downstream from the ``@task.branch`` decorated task.
 
 .. note::
     When a Task is downstream of both the branching operator *and* downstream of one or more of the selected tasks, it will not be skipped:
@@ -282,10 +282,11 @@ The task_id returned by the Python function has to reference a task directly dow
 
     The paths of the branching task are ``branch_a``, ``join`` and ``branch_b``. Since ``join`` is a downstream task of ``branch_a``, it will still be run, even though it was not returned as part of the branch decision.
 
-The ``BranchPythonOperator`` can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example:
+The ``@task.branch`` can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example:
 
 .. code-block:: python
 
+    @task.branch(task_id="branch_task")
     def branch_func(ti):
         xcom_value = int(ti.xcom_pull(task_ids="start_task"))
         if xcom_value >= 5:
@@ -303,20 +304,19 @@ The ``BranchPythonOperator`` can also be used with XComs allowing branching cont
         dag=dag,
     )
 
-    branch_op = BranchPythonOperator(
-        task_id="branch_task",
-        python_callable=branch_func,
-        dag=dag,
-    )
+    branch_op = branch_func()
 
     continue_op = EmptyOperator(task_id="continue_task", dag=dag)
     stop_op = EmptyOperator(task_id="stop_task", dag=dag)
 
     start_op >> branch_op >> [continue_op, stop_op]
 
-If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``BranchPythonOperator`` but expects you to provide an implementation of the method ``choose_branch``.
+If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``.
+
+.. note::
+    The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator.
 
-As with the callable for ``BranchPythonOperator``, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task::
+As with the callable for ``@task.branch``, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task::
 
     class MyBranchOperator(BaseBranchOperator):
         def choose_branch(self, context):
@@ -404,7 +404,6 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
 
         from airflow.models import DAG
         from airflow.operators.empty import EmptyOperator
-        from airflow.operators.python import BranchPythonOperator
 
         dag = DAG(
             dag_id="branch_without_trigger",
@@ -413,7 +412,14 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
         )
 
         run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
-        branching = BranchPythonOperator(task_id="branching", dag=dag, python_callable=lambda: "branch_a")
+
+
+        @task.branch(task_id="branching")
+        def do_branching():
+            return "branch_a"
+
+
+        branching = do_branching()
 
         branch_a = EmptyOperator(task_id="branch_a", dag=dag)
         follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)
diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst
index 944ff64a52..cb278402de 100644
--- a/docs/apache-airflow/concepts/operators.rst
+++ b/docs/apache-airflow/concepts/operators.rst
@@ -31,6 +31,11 @@ Airflow has a very extensive set of operators available, with some built-in to t
 - :class:`~airflow.operators.bash.BashOperator` - executes a bash command
 - :class:`~airflow.operators.python.PythonOperator` - calls an arbitrary Python function
 - :class:`~airflow.operators.email.EmailOperator` - sends an email
+- Use the ``@task`` decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments.
+
+.. note::
+    The ``@task`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonOperator`
+    to execute Python callables with no template rendering in its arguments.
 
 For a list of all core operators, see: :doc:`Core Operators and Hooks Reference </operators-and-hooks-ref>`.
 
@@ -103,6 +108,7 @@ You can also use Jinja templating with nested fields, as long as these nested fi
         dag=dag,
     )
 
+
 .. note:: The ``template_fields`` property can equally be a class variable or an instance variable.
 
 Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields:
@@ -134,6 +140,7 @@ Deep nested fields can also be substituted, as long as all intermediate fields a
         dag=dag,
     )
 
+
 You can pass custom options to the Jinja ``Environment`` when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string:
 
 .. code-block:: python
@@ -168,7 +175,6 @@ Now, when the following task is run, ``order_data`` argument is passed a string,
         python_callable=transform,
     )
 
-
 If you instead want the rendered template field to return a Native Python object (``dict`` in our example),
 you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
 
@@ -183,11 +189,13 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
     )
 
 
+    @task(task_id="extract")
     def extract():
         data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
         return json.loads(data_string)
 
 
+    @task(task_id="transform")
     def transform(order_data):
         print(type(order_data))
         for value in order_data.values():
@@ -195,7 +203,7 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
         return {"total_order_value": total_order_value}
 
 
-    extract_task = PythonOperator(task_id="extract", python_callable=extract)
+    extract_task = extract()
 
     transform_task = PythonOperator(
         task_id="transform",