You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/04 19:12:00 UTC

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743131138



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       > I think we should move this example into the docs so we don't actually try and run it.
   
   That's also what I think. From my troubleshooting, there's nothing wrong with the examples, what is missing is documentation on how to run them.
   cc: @kaxil 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org