You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/12/14 21:46:31 UTC
[airflow] 02/02: Warn without tracebacks when example_dags are missing deps (#20295)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4989eea84a86951d218587d7b8061cdd7bfd7ce4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Dec 14 14:28:17 2021 -0700
Warn without tracebacks when example_dags are missing deps (#20295)
(cherry picked from commit 5a6c022f946d1be2bd68a42a7a920fdf932932e5)
---
.../example_dags/example_kubernetes_executor.py | 363 +++++++++++----------
airflow/example_dags/example_python_operator.py | 51 +--
.../tutorial_taskflow_api_etl_virtualenv.py | 112 ++++---
3 files changed, 272 insertions(+), 254 deletions(-)
diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py
index 95a0f92..f984909 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -35,195 +35,198 @@ worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
try:
from kubernetes.client import models as k8s
except ImportError:
- log.warning("Could not import DAGs in example_kubernetes_executor.py", exc_info=True)
- log.warning("Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
-
-
-with DAG(
- dag_id='example_kubernetes_executor',
- schedule_interval=None,
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=['example3'],
-) as dag:
- # You can use annotations on your kubernetes pods!
- start_task_executor_config = {
- "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
- }
-
- @task(executor_config=start_task_executor_config)
- def start_task():
- print_stuff()
-
- start_task = start_task()
-
- # [START task_with_volume]
- executor_config_volume_mount = {
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- volume_mounts=[
- k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
- ],
- )
- ],
- volumes=[
- k8s.V1Volume(
- name="example-kubernetes-test-volume",
- host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
- )
- ],
+ log.warning(
+ "The example_kubernetes_executor example DAG requires the kubernetes provider."
+ " Please install it with: pip install apache-airflow[cncf.kubernetes]"
+ )
+ k8s = None
+
+if k8s:
+ with DAG(
+ dag_id='example_kubernetes_executor',
+ schedule_interval=None,
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=['example3'],
+ ) as dag:
+ # You can use annotations on your kubernetes pods!
+ start_task_executor_config = {
+ "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+ }
+
+ @task(executor_config=start_task_executor_config)
+ def start_task():
+ print_stuff()
+
+ start_task = start_task()
+
+ # [START task_with_volume]
+ executor_config_volume_mount = {
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[
+ k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+ ],
+ )
+ ],
+ volumes=[
+ k8s.V1Volume(
+ name="example-kubernetes-test-volume",
+ host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+ )
+ ],
+ )
+ ),
+ }
+
+ @task(executor_config=executor_config_volume_mount)
+ def test_volume_mount():
+ """
+ Tests whether the volume has been mounted.
+ """
+
+ with open('/foo/volume_mount_test.txt', 'w') as foo:
+ foo.write('Hello')
+
+ return_code = os.system("cat /foo/volume_mount_test.txt")
+ if return_code != 0:
+ raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+ volume_task = test_volume_mount()
+ # [END task_with_volume]
+
+ # [START task_with_sidecar]
+ executor_config_sidecar = {
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+ ),
+ k8s.V1Container(
+ name="sidecar",
+ image="ubuntu",
+ args=["echo \"retrieved from mount\" > /shared/test.txt"],
+ command=["bash", "-cx"],
+ volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+ ),
+ ],
+ volumes=[
+ k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+ ],
+ )
+ ),
+ }
+
+ @task(executor_config=executor_config_sidecar)
+ def test_sharedvolume_mount():
+ """
+ Tests whether the volume has been mounted.
+ """
+ for i in range(5):
+ try:
+ return_code = os.system("cat /shared/test.txt")
+ if return_code != 0:
+ raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+ except ValueError as e:
+ if i > 4:
+ raise e
+
+ sidecar_task = test_sharedvolume_mount()
+ # [END task_with_sidecar]
+
+ # You can add labels to pods
+ executor_config_non_root = {
+ "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+ }
+
+ @task(executor_config=executor_config_non_root)
+ def non_root_task():
+ print_stuff()
+
+ third_task = non_root_task()
+
+ executor_config_other_ns = {
+ "pod_override": k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
)
- ),
- }
-
- @task(executor_config=executor_config_volume_mount)
- def test_volume_mount():
- """
- Tests whether the volume has been mounted.
- """
-
- with open('/foo/volume_mount_test.txt', 'w') as foo:
- foo.write('Hello')
-
- return_code = os.system("cat /foo/volume_mount_test.txt")
- if return_code != 0:
- raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
- volume_task = test_volume_mount()
- # [END task_with_volume]
-
- # [START task_with_sidecar]
- executor_config_sidecar = {
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
- ),
- k8s.V1Container(
- name="sidecar",
- image="ubuntu",
- args=["echo \"retrieved from mount\" > /shared/test.txt"],
- command=["bash", "-cx"],
- volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
- ),
- ],
- volumes=[
- k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
- ],
+ }
+
+ @task(executor_config=executor_config_other_ns)
+ def other_namespace_task():
+ print_stuff()
+
+ other_ns_task = other_namespace_task()
+
+ # You can also change the base image, here we used the worker image for demonstration.
+ # Note that the image must have the same configuration as the
+ # worker image. Could be that you want to run this task in a special docker image that has a zip
+ # library built-in. You build the special docker image on top your worker image.
+ kube_exec_config_special = {
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base", image=f"{worker_container_repository}:{worker_container_tag}"
+ ),
+ ]
+ )
)
- ),
- }
-
- @task(executor_config=executor_config_sidecar)
- def test_sharedvolume_mount():
- """
- Tests whether the volume has been mounted.
- """
- for i in range(5):
- try:
- return_code = os.system("cat /shared/test.txt")
- if return_code != 0:
- raise ValueError(f"Error when checking volume mount. Return code {return_code}")
- except ValueError as e:
- if i > 4:
- raise e
-
- sidecar_task = test_sharedvolume_mount()
- # [END task_with_sidecar]
-
- # You can add labels to pods
- executor_config_non_root = {
- "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
- }
-
- @task(executor_config=executor_config_non_root)
- def non_root_task():
- print_stuff()
-
- third_task = non_root_task()
-
- executor_config_other_ns = {
- "pod_override": k8s.V1Pod(
- metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
- )
- }
-
- @task(executor_config=executor_config_other_ns)
- def other_namespace_task():
- print_stuff()
-
- other_ns_task = other_namespace_task()
-
- # You can also change the base image, here we used the worker image for demonstration.
- # Note that the image must have the same configuration as the
- # worker image. Could be that you want to run this task in a special docker image that has a zip
- # library built-in. You build the special docker image on top your worker image.
- kube_exec_config_special = {
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base", image=f"{worker_container_repository}:{worker_container_tag}"
- ),
+ }
+
+ @task(executor_config=kube_exec_config_special)
+ def base_image_override_task():
+ print_stuff()
+
+ base_image_task = base_image_override_task()
+
+ # Use k8s_client.V1Affinity to define node affinity
+ k8s_affinity = k8s.V1Affinity(
+ pod_anti_affinity=k8s.V1PodAntiAffinity(
+ required_during_scheduling_ignored_during_execution=[
+ k8s.V1PodAffinityTerm(
+ label_selector=k8s.V1LabelSelector(
+ match_expressions=[
+ k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+ ]
+ ),
+ topology_key='kubernetes.io/hostname',
+ )
]
)
)
- }
-
- @task(executor_config=kube_exec_config_special)
- def base_image_override_task():
- print_stuff()
-
- base_image_task = base_image_override_task()
-
- # Use k8s_client.V1Affinity to define node affinity
- k8s_affinity = k8s.V1Affinity(
- pod_anti_affinity=k8s.V1PodAntiAffinity(
- required_during_scheduling_ignored_during_execution=[
- k8s.V1PodAffinityTerm(
- label_selector=k8s.V1LabelSelector(
- match_expressions=[
- k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
- ]
- ),
- topology_key='kubernetes.io/hostname',
- )
- ]
- )
- )
- # Use k8s_client.V1Toleration to define node tolerations
- k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
+ # Use k8s_client.V1Toleration to define node tolerations
+ k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
- # Use k8s_client.V1ResourceRequirements to define resource limits
- k8s_resource_requirements = k8s.V1ResourceRequirements(
- requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
- )
+ # Use k8s_client.V1ResourceRequirements to define resource limits
+ k8s_resource_requirements = k8s.V1ResourceRequirements(
+ requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
+ )
- kube_exec_config_resource_limits = {
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- resources=k8s_resource_requirements,
- )
- ],
- affinity=k8s_affinity,
- tolerations=k8s_tolerations,
+ kube_exec_config_resource_limits = {
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ resources=k8s_resource_requirements,
+ )
+ ],
+ affinity=k8s_affinity,
+ tolerations=k8s_tolerations,
+ )
)
- )
- }
+ }
- @task(executor_config=kube_exec_config_resource_limits)
- def task_with_resource_limits():
- print_stuff()
+ @task(executor_config=kube_exec_config_resource_limits)
+ def task_with_resource_limits():
+ print_stuff()
- four_task = task_with_resource_limits()
+ four_task = task_with_resource_limits()
- start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task]
+ start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task]
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 8d5ce59..d533d84 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -20,6 +20,8 @@
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
+import logging
+import shutil
import time
from datetime import datetime
from pprint import pprint
@@ -27,6 +29,8 @@ from pprint import pprint
from airflow import DAG
from airflow.decorators import task
+log = logging.getLogger(__name__)
+
with DAG(
dag_id='example_python_operator',
schedule_interval=None,
@@ -59,29 +63,32 @@ with DAG(
run_this >> sleeping_task
# [END howto_operator_python_kwargs]
- # [START howto_operator_python_venv]
- @task.virtualenv(
- task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
- )
- def callable_virtualenv():
- """
- Example function that will be performed in a virtual environment.
+ if not shutil.which("virtualenv"):
+ log.warning("The virtalenv_python example task requires virtualenv, please install it.")
+ else:
+ # [START howto_operator_python_venv]
+ @task.virtualenv(
+ task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
+ )
+ def callable_virtualenv():
+ """
+ Example function that will be performed in a virtual environment.
- Importing at the module level ensures that it will not attempt to import the
- library before it is installed.
- """
- from time import sleep
+ Importing at the module level ensures that it will not attempt to import the
+ library before it is installed.
+ """
+ from time import sleep
- from colorama import Back, Fore, Style
+ from colorama import Back, Fore, Style
- print(Fore.RED + 'some red text')
- print(Back.GREEN + 'and with a green background')
- print(Style.DIM + 'and in dim text')
- print(Style.RESET_ALL)
- for _ in range(10):
- print(Style.DIM + 'Please wait...', flush=True)
- sleep(10)
- print('Finished')
+ print(Fore.RED + 'some red text')
+ print(Back.GREEN + 'and with a green background')
+ print(Style.DIM + 'and in dim text')
+ print(Style.RESET_ALL)
+ for _ in range(10):
+ print(Style.DIM + 'Please wait...', flush=True)
+ sleep(10)
+ print('Finished')
- virtualenv_task = callable_virtualenv()
- # [END howto_operator_python_venv]
+ virtualenv_task = callable_virtualenv()
+ # [END howto_operator_python_venv]
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
index 09aefcb..ac28095 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
@@ -17,65 +17,73 @@
# under the License.
+import logging
+import shutil
from datetime import datetime
from airflow.decorators import dag, task
+log = logging.getLogger(__name__)
-@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
-def tutorial_taskflow_api_etl_virtualenv():
- """
- ### TaskFlow API example using virtualenv
- This is a simple ETL data pipeline example which demonstrates the use of
- the TaskFlow API using three simple tasks for Extract, Transform, and Load.
- """
-
- @task.virtualenv(
- use_dill=True,
- system_site_packages=False,
- requirements=['funcsigs'],
+if not shutil.which("virtualenv"):
+ log.warning(
+ "The tutorial_taskflow_api_etl_virtualenv example DAG requires virtualenv, please install it."
)
- def extract():
- """
- #### Extract task
- A simple Extract task to get data ready for the rest of the data
- pipeline. In this case, getting data is simulated by reading from a
- hardcoded JSON string.
- """
- import json
-
- data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+else:
- order_data_dict = json.loads(data_string)
- return order_data_dict
-
- @task(multiple_outputs=True)
- def transform(order_data_dict: dict):
+ @dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
+ def tutorial_taskflow_api_etl_virtualenv():
"""
- #### Transform task
- A simple Transform task which takes in the collection of order data and
- computes the total order value.
+ ### TaskFlow API example using virtualenv
+ This is a simple ETL data pipeline example which demonstrates the use of
+ the TaskFlow API using three simple tasks for Extract, Transform, and Load.
"""
- total_order_value = 0
-
- for value in order_data_dict.values():
- total_order_value += value
-
- return {"total_order_value": total_order_value}
-
- @task()
- def load(total_order_value: float):
- """
- #### Load task
- A simple Load task which takes in the result of the Transform task and
- instead of saving it to end user review, just prints it out.
- """
-
- print(f"Total order value is: {total_order_value:.2f}")
-
- order_data = extract()
- order_summary = transform(order_data)
- load(order_summary["total_order_value"])
-
-tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()
+ @task.virtualenv(
+ use_dill=True,
+ system_site_packages=False,
+ requirements=['funcsigs'],
+ )
+ def extract():
+ """
+ #### Extract task
+ A simple Extract task to get data ready for the rest of the data
+ pipeline. In this case, getting data is simulated by reading from a
+ hardcoded JSON string.
+ """
+ import json
+
+ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+
+ order_data_dict = json.loads(data_string)
+ return order_data_dict
+
+ @task(multiple_outputs=True)
+ def transform(order_data_dict: dict):
+ """
+ #### Transform task
+ A simple Transform task which takes in the collection of order data and
+ computes the total order value.
+ """
+ total_order_value = 0
+
+ for value in order_data_dict.values():
+ total_order_value += value
+
+ return {"total_order_value": total_order_value}
+
+ @task()
+ def load(total_order_value: float):
+ """
+ #### Load task
+ A simple Load task which takes in the result of the Transform task and
+ instead of saving it to end user review, just prints it out.
+ """
+
+ print(f"Total order value is: {total_order_value:.2f}")
+
+ order_data = extract()
+ order_summary = transform(order_data)
+ load(order_summary["total_order_value"])
+
+ tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()