You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/12/14 21:46:31 UTC

[airflow] 02/02: Warn without tracebacks when example_dags are missing deps (#20295)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4989eea84a86951d218587d7b8061cdd7bfd7ce4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Dec 14 14:28:17 2021 -0700

    Warn without tracebacks when example_dags are missing deps (#20295)
    
    (cherry picked from commit 5a6c022f946d1be2bd68a42a7a920fdf932932e5)
---
 .../example_dags/example_kubernetes_executor.py    | 363 +++++++++++----------
 airflow/example_dags/example_python_operator.py    |  51 +--
 .../tutorial_taskflow_api_etl_virtualenv.py        | 112 ++++---
 3 files changed, 272 insertions(+), 254 deletions(-)

diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py
index 95a0f92..f984909 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -35,195 +35,198 @@ worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
 try:
     from kubernetes.client import models as k8s
 except ImportError:
-    log.warning("Could not import DAGs in example_kubernetes_executor.py", exc_info=True)
-    log.warning("Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
-
-
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example3'],
-) as dag:
-    # You can use annotations on your kubernetes pods!
-    start_task_executor_config = {
-        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-    }
-
-    @task(executor_config=start_task_executor_config)
-    def start_task():
-        print_stuff()
-
-    start_task = start_task()
-
-    # [START task_with_volume]
-    executor_config_volume_mount = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base",
-                        volume_mounts=[
-                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                        ],
-                    )
-                ],
-                volumes=[
-                    k8s.V1Volume(
-                        name="example-kubernetes-test-volume",
-                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                    )
-                ],
+    log.warning(
+        "The example_kubernetes_executor example DAG requires the kubernetes provider."
+        " Please install it with: pip install apache-airflow[cncf.kubernetes]"
+    )
+    k8s = None
+
+if k8s:
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example3'],
+    ) as dag:
+        # You can use annotations on your kubernetes pods!
+        start_task_executor_config = {
+            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+        }
+
+        @task(executor_config=start_task_executor_config)
+        def start_task():
+            print_stuff()
+
+        start_task = start_task()
+
+        # [START task_with_volume]
+        executor_config_volume_mount = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[
+                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                            ],
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="example-kubernetes-test-volume",
+                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                        )
+                    ],
+                )
+            ),
+        }
+
+        @task(executor_config=executor_config_volume_mount)
+        def test_volume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+
+            with open('/foo/volume_mount_test.txt', 'w') as foo:
+                foo.write('Hello')
+
+            return_code = os.system("cat /foo/volume_mount_test.txt")
+            if return_code != 0:
+                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+        volume_task = test_volume_mount()
+        # [END task_with_volume]
+
+        # [START task_with_sidecar]
+        executor_config_sidecar = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                            command=["bash", "-cx"],
+                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                        ),
+                    ],
+                    volumes=[
+                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                    ],
+                )
+            ),
+        }
+
+        @task(executor_config=executor_config_sidecar)
+        def test_sharedvolume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            for i in range(5):
+                try:
+                    return_code = os.system("cat /shared/test.txt")
+                    if return_code != 0:
+                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+                except ValueError as e:
+                    if i > 4:
+                        raise e
+
+        sidecar_task = test_sharedvolume_mount()
+        # [END task_with_sidecar]
+
+        # You can add labels to pods
+        executor_config_non_root = {
+            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+        }
+
+        @task(executor_config=executor_config_non_root)
+        def non_root_task():
+            print_stuff()
+
+        third_task = non_root_task()
+
+        executor_config_other_ns = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
             )
-        ),
-    }
-
-    @task(executor_config=executor_config_volume_mount)
-    def test_volume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-
-        with open('/foo/volume_mount_test.txt', 'w') as foo:
-            foo.write('Hello')
-
-        return_code = os.system("cat /foo/volume_mount_test.txt")
-        if return_code != 0:
-            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-    volume_task = test_volume_mount()
-    # [END task_with_volume]
-
-    # [START task_with_sidecar]
-    executor_config_sidecar = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base",
-                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                    ),
-                    k8s.V1Container(
-                        name="sidecar",
-                        image="ubuntu",
-                        args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                        command=["bash", "-cx"],
-                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                    ),
-                ],
-                volumes=[
-                    k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                ],
+        }
+
+        @task(executor_config=executor_config_other_ns)
+        def other_namespace_task():
+            print_stuff()
+
+        other_ns_task = other_namespace_task()
+
+        # You can also change the base image, here we used the worker image for demonstration.
+        # Note that the image must have the same configuration as the
+        # worker image. Could be that you want to run this task in a special docker image that has a zip
+        # library built-in. You build the special docker image on top your worker image.
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base", image=f"{worker_container_repository}:{worker_container_tag}"
+                        ),
+                    ]
+                )
             )
-        ),
-    }
-
-    @task(executor_config=executor_config_sidecar)
-    def test_sharedvolume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        for i in range(5):
-            try:
-                return_code = os.system("cat /shared/test.txt")
-                if return_code != 0:
-                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-            except ValueError as e:
-                if i > 4:
-                    raise e
-
-    sidecar_task = test_sharedvolume_mount()
-    # [END task_with_sidecar]
-
-    # You can add labels to pods
-    executor_config_non_root = {
-        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-    }
-
-    @task(executor_config=executor_config_non_root)
-    def non_root_task():
-        print_stuff()
-
-    third_task = non_root_task()
-
-    executor_config_other_ns = {
-        "pod_override": k8s.V1Pod(
-            metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
-        )
-    }
-
-    @task(executor_config=executor_config_other_ns)
-    def other_namespace_task():
-        print_stuff()
-
-    other_ns_task = other_namespace_task()
-
-    # You can also change the base image, here we used the worker image for demonstration.
-    # Note that the image must have the same configuration as the
-    # worker image. Could be that you want to run this task in a special docker image that has a zip
-    # library built-in. You build the special docker image on top your worker image.
-    kube_exec_config_special = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base", image=f"{worker_container_repository}:{worker_container_tag}"
-                    ),
+        }
+
+        @task(executor_config=kube_exec_config_special)
+        def base_image_override_task():
+            print_stuff()
+
+        base_image_task = base_image_override_task()
+
+        # Use k8s_client.V1Affinity to define node affinity
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
                 ]
             )
         )
-    }
-
-    @task(executor_config=kube_exec_config_special)
-    def base_image_override_task():
-        print_stuff()
-
-    base_image_task = base_image_override_task()
-
-    # Use k8s_client.V1Affinity to define node affinity
-    k8s_affinity = k8s.V1Affinity(
-        pod_anti_affinity=k8s.V1PodAntiAffinity(
-            required_during_scheduling_ignored_during_execution=[
-                k8s.V1PodAffinityTerm(
-                    label_selector=k8s.V1LabelSelector(
-                        match_expressions=[
-                            k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
-                        ]
-                    ),
-                    topology_key='kubernetes.io/hostname',
-                )
-            ]
-        )
-    )
 
-    # Use k8s_client.V1Toleration to define node tolerations
-    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    # Use k8s_client.V1ResourceRequirements to define resource limits
-    k8s_resource_requirements = k8s.V1ResourceRequirements(
-        requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
-    )
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_requirements = k8s.V1ResourceRequirements(
+            requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
+        )
 
-    kube_exec_config_resource_limits = {
-        "pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base",
-                        resources=k8s_resource_requirements,
-                    )
-                ],
-                affinity=k8s_affinity,
-                tolerations=k8s_tolerations,
+        kube_exec_config_resource_limits = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_requirements,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
             )
-        )
-    }
+        }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def task_with_resource_limits():
-        print_stuff()
+        @task(executor_config=kube_exec_config_resource_limits)
+        def task_with_resource_limits():
+            print_stuff()
 
-    four_task = task_with_resource_limits()
+        four_task = task_with_resource_limits()
 
-    start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task]
+        start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task]
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 8d5ce59..d533d84 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -20,6 +20,8 @@
 Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
 virtual environment.
 """
+import logging
+import shutil
 import time
 from datetime import datetime
 from pprint import pprint
@@ -27,6 +29,8 @@ from pprint import pprint
 from airflow import DAG
 from airflow.decorators import task
 
+log = logging.getLogger(__name__)
+
 with DAG(
     dag_id='example_python_operator',
     schedule_interval=None,
@@ -59,29 +63,32 @@ with DAG(
         run_this >> sleeping_task
     # [END howto_operator_python_kwargs]
 
-    # [START howto_operator_python_venv]
-    @task.virtualenv(
-        task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
-    )
-    def callable_virtualenv():
-        """
-        Example function that will be performed in a virtual environment.
+    if not shutil.which("virtualenv"):
+        log.warning("The virtalenv_python example task requires virtualenv, please install it.")
+    else:
+        # [START howto_operator_python_venv]
+        @task.virtualenv(
+            task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
+        )
+        def callable_virtualenv():
+            """
+            Example function that will be performed in a virtual environment.
 
-        Importing at the module level ensures that it will not attempt to import the
-        library before it is installed.
-        """
-        from time import sleep
+            Importing at the module level ensures that it will not attempt to import the
+            library before it is installed.
+            """
+            from time import sleep
 
-        from colorama import Back, Fore, Style
+            from colorama import Back, Fore, Style
 
-        print(Fore.RED + 'some red text')
-        print(Back.GREEN + 'and with a green background')
-        print(Style.DIM + 'and in dim text')
-        print(Style.RESET_ALL)
-        for _ in range(10):
-            print(Style.DIM + 'Please wait...', flush=True)
-            sleep(10)
-        print('Finished')
+            print(Fore.RED + 'some red text')
+            print(Back.GREEN + 'and with a green background')
+            print(Style.DIM + 'and in dim text')
+            print(Style.RESET_ALL)
+            for _ in range(10):
+                print(Style.DIM + 'Please wait...', flush=True)
+                sleep(10)
+            print('Finished')
 
-    virtualenv_task = callable_virtualenv()
-    # [END howto_operator_python_venv]
+        virtualenv_task = callable_virtualenv()
+        # [END howto_operator_python_venv]
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
index 09aefcb..ac28095 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
@@ -17,65 +17,73 @@
 # under the License.
 
 
+import logging
+import shutil
 from datetime import datetime
 
 from airflow.decorators import dag, task
 
+log = logging.getLogger(__name__)
 
-@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
-def tutorial_taskflow_api_etl_virtualenv():
-    """
-    ### TaskFlow API example using virtualenv
-    This is a simple ETL data pipeline example which demonstrates the use of
-    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
-    """
-
-    @task.virtualenv(
-        use_dill=True,
-        system_site_packages=False,
-        requirements=['funcsigs'],
+if not shutil.which("virtualenv"):
+    log.warning(
+        "The tutorial_taskflow_api_etl_virtualenv example DAG requires virtualenv, please install it."
     )
-    def extract():
-        """
-        #### Extract task
-        A simple Extract task to get data ready for the rest of the data
-        pipeline. In this case, getting data is simulated by reading from a
-        hardcoded JSON string.
-        """
-        import json
-
-        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+else:
 
-        order_data_dict = json.loads(data_string)
-        return order_data_dict
-
-    @task(multiple_outputs=True)
-    def transform(order_data_dict: dict):
+    @dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
+    def tutorial_taskflow_api_etl_virtualenv():
         """
-        #### Transform task
-        A simple Transform task which takes in the collection of order data and
-        computes the total order value.
+        ### TaskFlow API example using virtualenv
+        This is a simple ETL data pipeline example which demonstrates the use of
+        the TaskFlow API using three simple tasks for Extract, Transform, and Load.
         """
-        total_order_value = 0
-
-        for value in order_data_dict.values():
-            total_order_value += value
-
-        return {"total_order_value": total_order_value}
-
-    @task()
-    def load(total_order_value: float):
-        """
-        #### Load task
-        A simple Load task which takes in the result of the Transform task and
-        instead of saving it to end user review, just prints it out.
-        """
-
-        print(f"Total order value is: {total_order_value:.2f}")
-
-    order_data = extract()
-    order_summary = transform(order_data)
-    load(order_summary["total_order_value"])
-
 
-tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()
+        @task.virtualenv(
+            use_dill=True,
+            system_site_packages=False,
+            requirements=['funcsigs'],
+        )
+        def extract():
+            """
+            #### Extract task
+            A simple Extract task to get data ready for the rest of the data
+            pipeline. In this case, getting data is simulated by reading from a
+            hardcoded JSON string.
+            """
+            import json
+
+            data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+
+            order_data_dict = json.loads(data_string)
+            return order_data_dict
+
+        @task(multiple_outputs=True)
+        def transform(order_data_dict: dict):
+            """
+            #### Transform task
+            A simple Transform task which takes in the collection of order data and
+            computes the total order value.
+            """
+            total_order_value = 0
+
+            for value in order_data_dict.values():
+                total_order_value += value
+
+            return {"total_order_value": total_order_value}
+
+        @task()
+        def load(total_order_value: float):
+            """
+            #### Load task
+            A simple Load task which takes in the result of the Transform task and
+            instead of saving it to end user review, just prints it out.
+            """
+
+            print(f"Total order value is: {total_order_value:.2f}")
+
+        order_data = extract()
+        order_summary = transform(order_data)
+        load(order_summary["total_order_value"])
+
+    tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()