You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/08/18 03:54:16 UTC
[airflow] branch main updated: Update code examples from "classic" operators to taskflow (#25657)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 98aac5dc28 Update code examples from "classic" operators to taskflow (#25657)
98aac5dc28 is described below
commit 98aac5dc282b139f0e726aac512b04a6693ba83d
Author: Hank Ehly <he...@gmail.com>
AuthorDate: Thu Aug 18 12:54:06 2022 +0900
Update code examples from "classic" operators to taskflow (#25657)
Co-authored-by: Josh Fell <48...@users.noreply.github.com>
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
docs/apache-airflow/best-practices.rst | 19 ++++++-------------
docs/apache-airflow/faq.rst | 16 +++++++++-------
docs/apache-airflow/index.rst | 11 +++++++----
3 files changed, 22 insertions(+), 24 deletions(-)
diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst
index 3383dc7e6e..4b1bcb232d 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -124,7 +124,7 @@ Bad example:
import pendulum
from airflow import DAG
- from airflow.operators.python import PythonOperator
+ from airflow.decorators import task
import numpy as np # <-- THIS IS A VERY BAD IDEA! DON'T DO THAT!
@@ -136,16 +136,14 @@ Bad example:
tags=["example"],
) as dag:
+ @task()
def print_array():
"""Print Numpy array."""
a = np.arange(15).reshape(3, 5)
print(a)
return a
- run_this = PythonOperator(
- task_id="print_the_context",
- python_callable=print_array,
- )
+ print_array()
Good example:
@@ -154,7 +152,7 @@ Good example:
import pendulum
from airflow import DAG
- from airflow.operators.python import PythonOperator
+ from airflow.decorators import task
with DAG(
dag_id="example_python_operator",
@@ -164,6 +162,7 @@ Good example:
tags=["example"],
) as dag:
+ @task()
def print_array():
"""Print Numpy array."""
import numpy as np # <- THIS IS HOW NUMPY SHOULD BE IMPORTED IN THIS CASE
@@ -172,12 +171,7 @@ Good example:
print(a)
return a
- run_this = PythonOperator(
- task_id="print_the_context",
- python_callable=print_array,
- )
-
-
+ print_array()
Dynamic DAG Generation
----------------------
@@ -278,7 +272,6 @@ It's easier to grab the concept with an example. Let's say that we have the foll
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
- from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 829451d53a..fd91cca487 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -177,16 +177,12 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo
:name: dag_loader.py
from airflow import DAG
- from airflow.operators.python_operator import PythonOperator
+ from airflow.decorators import task
import pendulum
def create_dag(dag_id, schedule, dag_number, default_args):
- def hello_world_py(*args):
- print("Hello World")
- print("This is DAG: {}".format(str(dag_number)))
-
dag = DAG(
dag_id,
schedule=schedule,
@@ -195,7 +191,13 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo
)
with dag:
- t1 = PythonOperator(task_id="hello_world", python_callable=hello_world_py)
+
+ @task()
+ def hello_world():
+ print("Hello World")
+ print(f"This is DAG: {dag_number}")
+
+ hello_world()
return dag
@@ -410,7 +412,7 @@ upstream task.
from airflow.utils.trigger_rule import TriggerRule
- @task
+ @task()
def a_func():
raise AirflowException
diff --git a/docs/apache-airflow/index.rst b/docs/apache-airflow/index.rst
index 0743f5d7a6..7fae1362cb 100644
--- a/docs/apache-airflow/index.rst
+++ b/docs/apache-airflow/index.rst
@@ -40,24 +40,27 @@ Take a look at the following snippet of code:
from datetime import datetime
from airflow import DAG
+ from airflow.decorators import task
from airflow.operators.bash import BashOperator
- from airflow.operators.python import PythonOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
- airflow = PythonOperator(task_id="airflow", python_callable=lambda: print("airflow"))
+
+ @task()
+ def airflow():
+ print("airflow")
# Set dependencies between tasks
- hello >> airflow
+ hello >> airflow()
Here you see:
- A DAG named "demo", starting on Jan 1st 2022 and running once a day. A DAG is Airflow's representation of a workflow.
-- Two tasks, a BashOperator running a Bash script and a PythonOperator running a Python script
+- Two tasks, a BashOperator running a Bash script and a Python function defined using the ``@task`` decorator
- ``>>`` between the tasks defines a dependency and controls in which order the tasks will be executed
Airflow evaluates this script and executes the tasks at the set interval and in the defined order. The status