You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/01 23:46:56 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #19355: Update the example DAGs

mik-laj commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740628714



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example', 'example2'],
+    ) as dag:
+        # You don't have to use any special KubernetesExecutor configuration if you don't want to
+        @task
+        def start_task():
+            print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
-
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
+        # But you can if you want to
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            command=["/bin/sh", "-c"],
+                            args=["echo 'Hello world'"],
+                        ),
+                    ]
+                )
+            )
+        }
 
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
+        @task(executor_config=kube_exec_config_special)
+        def one_task():
+            print_stuff()
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
-    }
+        # Limit resources on this operator/task with node affinity & tolerations
+        # Use k8s_client.V1Affinity to define node affinity & tolerations
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
+                ]
+            )
+        )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_limits = k8s.V1ResourceRequirements(
+            requests={'memory': '128Mi'}, limits={'memory': '128Mi'}
+        )
+        # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+        k8s_exec_config_resources = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_limits,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
+            )
         }
-    }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
-        print_stuff()
+        @task(executor_config=k8s_exec_config_resources)
+        def two_task():
+            print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+        # Use k8s_client.V1PodSpec to define pod spec with labels
+        kube_exec_config_pod_labels = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"foo": "bar"}),
+            )
+        }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
-        print_stuff()
+        @task(executor_config=kube_exec_config_pod_labels)
+        def three_task():
+            print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+        start_task = start_task()
+        one_task = one_task()
+        two_task = two_task()
+        three_task = three_task()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+        start_task >> [one_task, two_task, three_task]
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
       log.warning("Install kubernetes dependencies with: pip install apache-'airflow[cncf.kubernetes]'")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org