You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/01 22:09:23 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #19355: Update the example DAGs

ephraimbuddy opened a new pull request #19355:
URL: https://github.com/apache/airflow/pull/19355


   DETAILS LATER
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740917312



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -89,7 +89,6 @@ def test_volume_mount():
 
         # [START task_with_template]
         executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),

Review comment:
       This is used in the docs on https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html#kubernetesexecutor-architecture
   
   ![image](https://user-images.githubusercontent.com/34150/139829123-30594d27-0353-4cb8-b2a4-eef7aa7f37c8.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740917312



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -89,7 +89,6 @@ def test_volume_mount():
 
         # [START task_with_template]
         executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),

Review comment:
       This is used in the docs on https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html#kubernetesexecutor-architecture
   
   ![image](https://user-images.githubusercontent.com/34150/139829123-30594d27-0353-4cb8-b2a4-eef7aa7f37c8.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #19355: Update the example DAGs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740628714



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example', 'example2'],
+    ) as dag:
+        # You don't have to use any special KubernetesExecutor configuration if you don't want to
+        @task
+        def start_task():
+            print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
-
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
+        # But you can if you want to
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            command=["/bin/sh", "-c"],
+                            args=["echo 'Hello world'"],
+                        ),
+                    ]
+                )
+            )
+        }
 
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
+        @task(executor_config=kube_exec_config_special)
+        def one_task():
+            print_stuff()
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
-    }
+        # Limit resources on this operator/task with node affinity & tolerations
+        # Use k8s_client.V1Affinity to define node affinity & tolerations
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
+                ]
+            )
+        )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_limits = k8s.V1ResourceRequirements(
+            requests={'memory': '128Mi'}, limits={'memory': '128Mi'}
+        )
+        # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+        k8s_exec_config_resources = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_limits,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
+            )
         }
-    }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
-        print_stuff()
+        @task(executor_config=k8s_exec_config_resources)
+        def two_task():
+            print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+        # Use k8s_client.V1PodSpec to define pod spec with labels
+        kube_exec_config_pod_labels = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"foo": "bar"}),
+            )
+        }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
-        print_stuff()
+        @task(executor_config=kube_exec_config_pod_labels)
+        def three_task():
+            print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+        start_task = start_task()
+        one_task = one_task()
+        two_task = two_task()
+        three_task = three_task()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+        start_task >> [one_task, two_task, three_task]
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
       log.warning("Install kubernetes dependencies with: pip install apache-'airflow[cncf.kubernetes]'")
   ```

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example', 'example2'],
+    ) as dag:
+        # You don't have to use any special KubernetesExecutor configuration if you don't want to
+        @task
+        def start_task():
+            print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
-
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
+        # But you can if you want to
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            command=["/bin/sh", "-c"],
+                            args=["echo 'Hello world'"],
+                        ),
+                    ]
+                )
+            )
+        }
 
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
+        @task(executor_config=kube_exec_config_special)
+        def one_task():
+            print_stuff()
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
-    }
+        # Limit resources on this operator/task with node affinity & tolerations
+        # Use k8s_client.V1Affinity to define node affinity & tolerations
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
+                ]
+            )
+        )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_limits = k8s.V1ResourceRequirements(
+            requests={'memory': '128Mi'}, limits={'memory': '128Mi'}
+        )
+        # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+        k8s_exec_config_resources = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_limits,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
+            )
         }
-    }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
-        print_stuff()
+        @task(executor_config=k8s_exec_config_resources)
+        def two_task():
+            print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+        # Use k8s_client.V1PodSpec to define pod spec with labels
+        kube_exec_config_pod_labels = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"foo": "bar"}),
+            )
+        }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
-        print_stuff()
+        @task(executor_config=kube_exec_config_pod_labels)
+        def three_task():
+            print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+        start_task = start_task()
+        one_task = one_task()
+        two_task = two_task()
+        three_task = three_task()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+        start_task >> [one_task, two_task, three_task]
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
       log.warning("Install kubernetes dependencies with: pip install 'apache-airflow[cncf.kubernetes]'")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r746107308



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -25,147 +25,133 @@
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
+
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
     log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',

Review comment:
       Yep, I think so: https://github.com/apache/airflow/pull/19355#pullrequestreview-797771124




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740892129



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 

Review comment:
       Probably better to put everything under this line into the `else` block instead. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743131138



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       > I think we should move this example into the docs so we don't actually try and run it.
   
   That's also what I think. From my troubleshooting, there's nothing wrong with the examples, what is missing is documentation on how to run them.
   cc: @kaxil 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743131138



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       > I think we should move this example into the docs so we don't actually try and run it.
   
   That's also what I think. From my troubleshooting, there's nothing wrong with the examples, what is missing is documentation on how to run them.
   cc: @kaxil 
   
   

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",

Review comment:
       I couldn't get the env var to work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #19355:
URL: https://github.com/apache/airflow/pull/19355


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r742857463



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",

Review comment:
       To make this portable, what if we used `[kubernetes] worker_container_repository` and `[kubernetes] worker_container_tag` instead? Then it'd be more likely to work and we could just have a similar comment in the hypothetical, e.g. "Imagine you need custom library for only this task, build a new image off of your normal image and provide it here. (We simply use the default image to ensure this example will run)" type thing.
   
   If we wanted it to "always" work, we'd really need to use those configs OR pull the image out of the `pod_template_file`.
   
   I sorta wonder if there is value in having a running example of this vs just documenting it though.

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations

Review comment:
       ```suggestion
       # Use k8s_client.V1Affinity to define node affinity
   ```

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",

Review comment:
       I'm not a fan of this example. It can't be done in a portable way, nor do I think we should publish a `zip` image just for an example DAG. Did you try the env var check example I mentioned yesterday instead?

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations
+    k8s_affinity = k8s.V1Affinity(
+        pod_anti_affinity=k8s.V1PodAntiAffinity(
+            required_during_scheduling_ignored_during_execution=[
+                k8s.V1PodAffinityTerm(
+                    label_selector=k8s.V1LabelSelector(
+                        match_expressions=[
+                            k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                        ]
+                    ),
+                    topology_key='kubernetes.io/hostname',
+                )
             ]
-        }
-    }
+        )
+    )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+    # Use k8s_client.V1Toleration to define node tolerations
+    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
-        }
+    # Use k8s_client.V1ResourceRequirements to define resource limits
+    k8s_resource_limits = k8s.V1ResourceRequirements(requests={'memory': '128Mi'}, limits={'memory': '128Mi'})
+    # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+    k8s_exec_config_resources = {

Review comment:
       We should change this name, as it's not just resources.
   
   Also, I think we should reorder these examples. I have to imagine resources are used significantly more than custom worker images (which probably should be the last example).

##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       This file doesn't exist by default, it just so happens to be where the official chart puts it. We also couldn't add a custom pod_template_file that'll work in all instances.
   
   I think we should move this example into the docs so we don't actually try and run it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #19355:
URL: https://github.com/apache/airflow/pull/19355


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r747317824



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
 with DAG(
     dag_id='example_kubernetes_executor',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example', 'example2'],
+    tags=['example3'],

Review comment:
       Not particularly related but what do these tags mean in the first place?

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       Same applies for the other example DAG file.

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
 with DAG(
     dag_id='example_kubernetes_executor',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example', 'example2'],
+    tags=['example3'],
 ) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
     def start_task():
         print_stuff()
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
-
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    start_task = start_task()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
 
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
         """
-        Checks whether Zip is installed.
-
-        :raises SystemError: if zip is not installed
+        Tests whether the volume has been mounted.
         """
-        return_code = os.system("zip")
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
         if return_code != 0:
-            raise SystemError("The zip binary is not found")
-
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_sidecar]
+    executor_config_sidecar = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                    ),
+                    k8s.V1Container(
+                        name="sidecar",
+                        image="ubuntu",
+                        args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                        command=["bash", "-cx"],
+                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                    ),
+                ],
+                volumes=[
+                    k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                ],
+            )
+        ),
     }
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+    @task(executor_config=executor_config_sidecar)
+    def test_sharedvolume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+        for i in range(5):
+            try:
+                return_code = os.system("cat /shared/test.txt")
+                if return_code != 0:
+                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+            except ValueError as e:
+                if i > 4:
+                    raise e
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
-        }
+    sidecar_task = test_sharedvolume_mount()
+    # [END task_with_sidecar]
+
+    # You can add labels to pods
+    executor_config_non_root = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
     }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
+    @task(executor_config=executor_config_non_root)
+    def non_root_task():
         print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+    third_task = non_root_task()
+
+    executor_config_other_ns = {
+        "pod_override": k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
+        )
+    }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
+    @task(executor_config=executor_config_other_ns)
+    def other_namespace_task():
         print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+    other_ns_task = other_namespace_task()
+
+    # You can also change the base image, here we used the worker image for demonstration.
+    # Note that the image must have the same configuration as the
+    # worker image. Could be that you want to run this task in a special docker image that has a zip
+    # library built-in. You build the special docker image on top your worker image.
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base", image=f"{worker_container_repository}:{worker_container_tag}"
+                    ),
+                ]
+            )
+        )
+    }
+
+    @task(executor_config=kube_exec_config_special)
+    def base_image_override_task():
+        print_stuff()
+
+    base_image_task = base_image_override_task()
+
+    # Use k8s_client.V1Affinity to define node affinity
+    k8s_affinity = k8s.V1Affinity(
+        pod_anti_affinity=k8s.V1PodAntiAffinity(
+            required_during_scheduling_ignored_during_execution=[
+                k8s.V1PodAffinityTerm(
+                    label_selector=k8s.V1LabelSelector(
+                        match_expressions=[
+                            k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                        ]
+                    ),
+                    topology_key='kubernetes.io/hostname',
+                )
+            ]
+        )
+    )
+
+    # Use k8s_client.V1Toleration to define node tolerations
+    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
+
+    # Use k8s_client.V1ResourceRequirements to define resource limits
+    k8s_resource_requirements = k8s.V1ResourceRequirements(
+        requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
+    )
+
+    kube_exec_config_resource_limits = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        resources=k8s_resource_requirements,
+                    )
+                ],
+                affinity=k8s_affinity,
+                tolerations=k8s_tolerations,
+            )
+        )
+    }
+
+    @task(executor_config=kube_exec_config_resource_limits)
+    def task_with_resource_limits():
+        print_stuff()
+
+    four_task = task_with_resource_limits()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+    (start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task])

Review comment:
       ```suggestion
       start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task]
   ```
   
   Are the outmost parantheses needed? (nit)

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
   try:
       from kubernetes.client import models as k8s
   except ImportError:
       log.warning("Could not import DAGs in example_kubernetes_executor.py", exc_info=True)
       log.warning("Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r747331069



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
 with DAG(
     dag_id='example_kubernetes_executor',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example', 'example2'],
+    tags=['example3'],

Review comment:
       So… this particular change here does not really change anything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #19355: Update the example DAGs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740628714



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example', 'example2'],
+    ) as dag:
+        # You don't have to use any special KubernetesExecutor configuration if you don't want to
+        @task
+        def start_task():
+            print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
-
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
+        # But you can if you want to
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            command=["/bin/sh", "-c"],
+                            args=["echo 'Hello world'"],
+                        ),
+                    ]
+                )
+            )
+        }
 
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
+        @task(executor_config=kube_exec_config_special)
+        def one_task():
+            print_stuff()
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
-    }
+        # Limit resources on this operator/task with node affinity & tolerations
+        # Use k8s_client.V1Affinity to define node affinity & tolerations
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
+                ]
+            )
+        )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_limits = k8s.V1ResourceRequirements(
+            requests={'memory': '128Mi'}, limits={'memory': '128Mi'}
+        )
+        # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+        k8s_exec_config_resources = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_limits,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
+            )
         }
-    }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
-        print_stuff()
+        @task(executor_config=k8s_exec_config_resources)
+        def two_task():
+            print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+        # Use k8s_client.V1PodSpec to define pod spec with labels
+        kube_exec_config_pod_labels = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"foo": "bar"}),
+            )
+        }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
-        print_stuff()
+        @task(executor_config=kube_exec_config_pod_labels)
+        def three_task():
+            print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+        start_task = start_task()
+        one_task = one_task()
+        two_task = two_task()
+        three_task = three_task()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+        start_task >> [one_task, two_task, three_task]
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
       log.warning("Install kubernetes dependencies with: pip install 'apache-airflow[cncf.kubernetes]'")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #19355: Update the example DAGs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740628714



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example', 'example2'],
+    ) as dag:
+        # You don't have to use any special KubernetesExecutor configuration if you don't want to
+        @task
+        def start_task():
+            print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
-
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
+        # But you can if you want to
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            command=["/bin/sh", "-c"],
+                            args=["echo 'Hello world'"],
+                        ),
+                    ]
+                )
+            )
+        }
 
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
+        @task(executor_config=kube_exec_config_special)
+        def one_task():
+            print_stuff()
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
-    }
+        # Limit resources on this operator/task with node affinity & tolerations
+        # Use k8s_client.V1Affinity to define node affinity & tolerations
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
+                ]
+            )
+        )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_limits = k8s.V1ResourceRequirements(
+            requests={'memory': '128Mi'}, limits={'memory': '128Mi'}
+        )
+        # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+        k8s_exec_config_resources = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_limits,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
+            )
         }
-    }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
-        print_stuff()
+        @task(executor_config=k8s_exec_config_resources)
+        def two_task():
+            print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+        # Use k8s_client.V1PodSpec to define pod spec with labels
+        kube_exec_config_pod_labels = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"foo": "bar"}),
+            )
+        }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
-        print_stuff()
+        @task(executor_config=kube_exec_config_pod_labels)
+        def three_task():
+            print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+        start_task = start_task()
+        one_task = one_task()
+        two_task = two_task()
+        three_task = three_task()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+        start_task >> [one_task, two_task, three_task]
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
       log.warning("Install kubernetes dependencies with: pip install apache-'airflow[cncf.kubernetes]'")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740917312



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -89,7 +89,6 @@ def test_volume_mount():
 
         # [START task_with_template]
         executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),

Review comment:
       This is used in the docs on https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html#kubernetesexecutor-architecture
   
   ![image](https://user-images.githubusercontent.com/34150/139829123-30594d27-0353-4cb8-b2a4-eef7aa7f37c8.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740892129



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 

Review comment:
       Probably better to put everything under this line into the `else` block instead. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#issuecomment-957181808


   I'm not sure why `virtualenv` tasks(both normal and decorated) fail in the Kubernetes cluster. 
   cc: @dimberman @jedcunningham 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r747331358



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
 with DAG(
     dag_id='example_kubernetes_executor',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example', 'example2'],
+    tags=['example3'],
 ) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
     def start_task():
         print_stuff()
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
-
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    start_task = start_task()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
 
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
         """
-        Checks whether Zip is installed.
-
-        :raises SystemError: if zip is not installed
+        Tests whether the volume has been mounted.
         """
-        return_code = os.system("zip")
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
         if return_code != 0:
-            raise SystemError("The zip binary is not found")
-
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_sidecar]
+    executor_config_sidecar = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                    ),
+                    k8s.V1Container(
+                        name="sidecar",
+                        image="ubuntu",
+                        args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                        command=["bash", "-cx"],
+                        volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
+                    ),
+                ],
+                volumes=[
+                    k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                ],
+            )
+        ),
     }
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+    @task(executor_config=executor_config_sidecar)
+    def test_sharedvolume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+        for i in range(5):
+            try:
+                return_code = os.system("cat /shared/test.txt")
+                if return_code != 0:
+                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+            except ValueError as e:
+                if i > 4:
+                    raise e
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
-        }
+    sidecar_task = test_sharedvolume_mount()
+    # [END task_with_sidecar]
+
+    # You can add labels to pods
+    executor_config_non_root = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
     }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
+    @task(executor_config=executor_config_non_root)
+    def non_root_task():
         print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+    third_task = non_root_task()
+
+    executor_config_other_ns = {
+        "pod_override": k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={'release': 'stable'})
+        )
+    }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
+    @task(executor_config=executor_config_other_ns)
+    def other_namespace_task():
         print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+    other_ns_task = other_namespace_task()
+
+    # You can also change the base image, here we used the worker image for demonstration.
+    # Note that the image must have the same configuration as the
+    # worker image. Could be that you want to run this task in a special docker image that has a zip
+    # library built-in. You build the special docker image on top your worker image.
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base", image=f"{worker_container_repository}:{worker_container_tag}"
+                    ),
+                ]
+            )
+        )
+    }
+
+    @task(executor_config=kube_exec_config_special)
+    def base_image_override_task():
+        print_stuff()
+
+    base_image_task = base_image_override_task()
+
+    # Use k8s_client.V1Affinity to define node affinity
+    k8s_affinity = k8s.V1Affinity(
+        pod_anti_affinity=k8s.V1PodAntiAffinity(
+            required_during_scheduling_ignored_during_execution=[
+                k8s.V1PodAffinityTerm(
+                    label_selector=k8s.V1LabelSelector(
+                        match_expressions=[
+                            k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                        ]
+                    ),
+                    topology_key='kubernetes.io/hostname',
+                )
+            ]
+        )
+    )
+
+    # Use k8s_client.V1Toleration to define node tolerations
+    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
+
+    # Use k8s_client.V1ResourceRequirements to define resource limits
+    k8s_resource_requirements = k8s.V1ResourceRequirements(
+        requests={'memory': '512Mi'}, limits={'memory': '512Mi'}
+    )
+
+    kube_exec_config_resource_limits = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        resources=k8s_resource_requirements,
+                    )
+                ],
+                affinity=k8s_affinity,
+                tolerations=k8s_tolerations,
+            )
+        )
+    }
+
+    @task(executor_config=kube_exec_config_resource_limits)
+    def task_with_resource_limits():
+        print_stuff()
+
+    four_task = task_with_resource_limits()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+    (start_task >> [volume_task, other_ns_task, sidecar_task] >> third_task >> [base_image_task, four_task])

Review comment:
       Not needed, I previously had a lot of tasks here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r747330179



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
 with DAG(
     dag_id='example_kubernetes_executor',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example', 'example2'],
+    tags=['example3'],

Review comment:
       Just to group dags so one can easily filter the dags in the UI based on the tag. This here is only used for demonstration purpose




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r747332136



##########
File path: docs/apache-airflow/example_dags/example_pod_template_file.py
##########
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.example_dags.libs.helper import print_stuff
+from airflow.settings import AIRFLOW_HOME
+
+log = logging.getLogger(__name__)
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_pod_template_file.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       Same as the ImportError block in the other file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r747333742



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
 with DAG(
     dag_id='example_kubernetes_executor',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example', 'example2'],
+    tags=['example3'],

Review comment:
       The dag where it was defined is `example_kubernetes_executor_config.py`, I worked on that file and later renamed it to `example_kubernetes_executor.py`. The original file `example_kubernetes_executor.py` was deleted so it appear now as if I worked on it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #19355: Update the example DAGs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740628714



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example', 'example2'],
+    ) as dag:
+        # You don't have to use any special KubernetesExecutor configuration if you don't want to
+        @task
+        def start_task():
+            print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
-
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
+        # But you can if you want to
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            command=["/bin/sh", "-c"],
+                            args=["echo 'Hello world'"],
+                        ),
+                    ]
+                )
+            )
+        }
 
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
+        @task(executor_config=kube_exec_config_special)
+        def one_task():
+            print_stuff()
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
-    }
+        # Limit resources on this operator/task with node affinity & tolerations
+        # Use k8s_client.V1Affinity to define node affinity & tolerations
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
+                ]
+            )
+        )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_limits = k8s.V1ResourceRequirements(
+            requests={'memory': '128Mi'}, limits={'memory': '128Mi'}
+        )
+        # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+        k8s_exec_config_resources = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_limits,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
+            )
         }
-    }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
-        print_stuff()
+        @task(executor_config=k8s_exec_config_resources)
+        def two_task():
+            print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+        # Use k8s_client.V1PodSpec to define pod spec with labels
+        kube_exec_config_pod_labels = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"foo": "bar"}),
+            )
+        }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
-        print_stuff()
+        @task(executor_config=kube_exec_config_pod_labels)
+        def three_task():
+            print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+        start_task = start_task()
+        one_task = one_task()
+        two_task = two_task()
+        three_task = three_task()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+        start_task >> [one_task, two_task, three_task]
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
       log.warning("Install kubernetes dependencies with: pip install apache-'airflow[cncf.kubernetes]'")
   ```

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 
-    @task(executor_config=kube_exec_config_special)
-    def one_task():
-        print_stuff()
+    with DAG(
+        dag_id='example_kubernetes_executor',
+        schedule_interval=None,
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+        tags=['example', 'example2'],
+    ) as dag:
+        # You don't have to use any special KubernetesExecutor configuration if you don't want to
+        @task
+        def start_task():
+            print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
-
-    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
+        # But you can if you want to
+        kube_exec_config_special = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            command=["/bin/sh", "-c"],
+                            args=["echo 'Hello world'"],
+                        ),
+                    ]
+                )
+            )
+        }
 
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
+        @task(executor_config=kube_exec_config_special)
+        def one_task():
+            print_stuff()
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
-            ]
-        }
-    }
+        # Limit resources on this operator/task with node affinity & tolerations
+        # Use k8s_client.V1Affinity to define node affinity & tolerations
+        k8s_affinity = k8s.V1Affinity(
+            pod_anti_affinity=k8s.V1PodAntiAffinity(
+                required_during_scheduling_ignored_during_execution=[
+                    k8s.V1PodAffinityTerm(
+                        label_selector=k8s.V1LabelSelector(
+                            match_expressions=[
+                                k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                            ]
+                        ),
+                        topology_key='kubernetes.io/hostname',
+                    )
+                ]
+            )
+        )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+        # Use k8s_client.V1Toleration to define node tolerations
+        k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
+        # Use k8s_client.V1ResourceRequirements to define resource limits
+        k8s_resource_limits = k8s.V1ResourceRequirements(
+            requests={'memory': '128Mi'}, limits={'memory': '128Mi'}
+        )
+        # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+        k8s_exec_config_resources = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            resources=k8s_resource_limits,
+                        )
+                    ],
+                    affinity=k8s_affinity,
+                    tolerations=k8s_tolerations,
+                )
+            )
         }
-    }
 
-    @task(executor_config=kube_exec_config_resource_limits)
-    def three_task():
-        print_stuff()
+        @task(executor_config=k8s_exec_config_resources)
+        def two_task():
+            print_stuff()
 
-    # Add arbitrary labels to worker pods
-    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}}
+        # Use k8s_client.V1PodSpec to define pod spec with labels
+        kube_exec_config_pod_labels = {
+            "pod_override": k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(labels={"foo": "bar"}),
+            )
+        }
 
-    @task(executor_config=kube_exec_config_pod_labels)
-    def four_task():
-        print_stuff()
+        @task(executor_config=kube_exec_config_pod_labels)
+        def three_task():
+            print_stuff()
 
-    start_task = start_task()
-    one_task = one_task()
-    two_task = assert_zip_binary()
-    three_task = three_task()
-    four_task = four_task()
+        start_task = start_task()
+        one_task = one_task()
+        two_task = two_task()
+        three_task = three_task()
 
-    start_task >> [one_task, two_task, three_task, four_task]
+        start_task >> [one_task, two_task, three_task]
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")

Review comment:
       ```suggestion
       log.warning("Install kubernetes dependencies with: pip install 'apache-airflow[cncf.kubernetes]'")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743131138



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       > I think we should move this example into the docs so we don't actually try and run it.
   
   That's also what I think. From my troubleshooting, there's nothing wrong with the examples, what is missing is documentation on how to run them.
   cc: @kaxil 
   
   

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",

Review comment:
       I couldn't get the env var to work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r742444508



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -89,7 +89,6 @@ def test_volume_mount():
 
         # [START task_with_template]
         executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),

Review comment:
       Can you update this please @ephraimbuddy 

##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -89,7 +89,6 @@ def test_volume_mount():
 
         # [START task_with_template]
         executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),

Review comment:
       Can you update this please @ephraimbuddy 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r742444508



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -89,7 +89,6 @@ def test_volume_mount():
 
         # [START task_with_template]
         executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),

Review comment:
       Can you update this please @ephraimbuddy 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#issuecomment-957181808


   I'm not sure why `virtualenv` tasks(both normal and decorated) fail in the Kubernetes cluster. 
   cc: @dimberman @jedcunningham 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r740892129



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -18,86 +18,114 @@
 """
 This is an example dag for using the Kubernetes Executor.
 """
-import os
+import logging
 from datetime import datetime
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
-with DAG(
-    dag_id='example_kubernetes_executor',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example', 'example2'],
-) as dag:
-    # You don't have to use any special KubernetesExecutor configuration if you don't want to
-    @task
-    def start_task():
-        print_stuff()
+log = logging.getLogger(__name__)
 
-    # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+try:
+    from kubernetes.client import models as k8s
 

Review comment:
       Probably better to put everything under this line into the `else` block instead. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743261815



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       Sounds good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r746106743



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -25,147 +25,133 @@
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
+
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
     log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',

Review comment:
       Should we just have a single example dag file for all Kubernetes Executor tasks? 
   
   WDYT @ephraimbuddy @jedcunningham 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r746113056



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -25,147 +25,133 @@
 from airflow import DAG
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
+
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
     log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',

Review comment:
       Cool, your "comment" went unnoticed in between review comments :D 
   
   @ephraimbuddy Can you update please




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#issuecomment-966391100


   Hi @kaxil, please take a look at this doc error, I haven't been able to figure it out


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r742857463



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",

Review comment:
       To make this portable, what if we used `[kubernetes] worker_container_repository` and `[kubernetes] worker_container_tag` instead? Then it'd be more likely to work and we could just have a similar comment in the hypothetical, e.g. "Imagine you need custom library for only this task, build a new image off of your normal image and provide it here. (We simply use the default image to ensure this example will run)" type thing.
   
   If we wanted it to "always" work, we'd really need to use those configs OR pull the image out of the `pod_template_file`.
   
   I sorta wonder if there is value in having a running example of this vs just documenting it though.

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations

Review comment:
       ```suggestion
       # Use k8s_client.V1Affinity to define node affinity
   ```

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",

Review comment:
       I'm not a fan of this example. It can't be done in a portable way, nor do I think we should publish a `zip` image just for an example DAG. Did you try the env var check example I mentioned yesterday instead?

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations
+    k8s_affinity = k8s.V1Affinity(
+        pod_anti_affinity=k8s.V1PodAntiAffinity(
+            required_during_scheduling_ignored_during_execution=[
+                k8s.V1PodAffinityTerm(
+                    label_selector=k8s.V1LabelSelector(
+                        match_expressions=[
+                            k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                        ]
+                    ),
+                    topology_key='kubernetes.io/hostname',
+                )
             ]
-        }
-    }
+        )
+    )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+    # Use k8s_client.V1Toleration to define node tolerations
+    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
-        }
+    # Use k8s_client.V1ResourceRequirements to define resource limits
+    k8s_resource_limits = k8s.V1ResourceRequirements(requests={'memory': '128Mi'}, limits={'memory': '128Mi'})
+    # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+    k8s_exec_config_resources = {

Review comment:
       We should change this name, as it's not just resources.
   
   Also, I think we should reorder these examples. I have to imagine resources are used significantly more than custom worker images (which probably should be the last example).

##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       This file doesn't exist by default, it just so happens to be where the official chart puts it. We also couldn't add a custom pod_template_file that'll work in all instances.
   
   I think we should move this example into the docs so we don't actually try and run it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743261815



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       Sounds good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#issuecomment-957181808


   I'm not sure why `virtualenv` tasks(both normal and decorated) fail in the Kubernetes cluster. 
   cc: @dimberman @jedcunningham 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #19355:
URL: https://github.com/apache/airflow/pull/19355


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r747331069



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -16,88 +16,214 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This is an example dag for using the Kubernetes Executor.
+This is an example dag for using a Kubernetes Executor Configuration.
 """
+import logging
 import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
 
+log = logging.getLogger(__name__)
+
+worker_container_repository = conf.get('kubernetes', 'worker_container_repository')
+worker_container_tag = conf.get('kubernetes', 'worker_container_tag')
+
+try:
+    from kubernetes.client import models as k8s
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+
+
 with DAG(
     dag_id='example_kubernetes_executor',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example', 'example2'],
+    tags=['example3'],

Review comment:
       So… this particular change does not really mean anything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #19355:
URL: https://github.com/apache/airflow/pull/19355


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743132527



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",

Review comment:
       I couldn't get the env var to work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r743261815



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       Sounds good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19355: Ensure the example DAGs are all working

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r742857463



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",

Review comment:
       To make this portable, what if we used `[kubernetes] worker_container_repository` and `[kubernetes] worker_container_tag` instead? Then it'd be more likely to work and we could just have a similar comment in the hypothetical, e.g. "Imagine you need custom library for only this task, build a new image off of your normal image and provide it here. (We simply use the default image to ensure this example will run)" type thing.
   
   If we wanted it to "always" work, we'd really need to use those configs OR pull the image out of the `pod_template_file`.
   
   I sorta wonder if there is value in having a running example of this vs just documenting it though.

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations

Review comment:
       ```suggestion
       # Use k8s_client.V1Affinity to define node affinity
   ```

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",

Review comment:
       I'm not a fan of this example. It can't be done in a portable way, nor do I think we should publish a `zip` image just for an example DAG. Did you try the env var check example I mentioned yesterday instead?

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` commands.
+                        # A scenario where changing the base image could be needed is if you have a
+                        # special library that you want to run only in this task. In that case,
+                        # you build the image with the special library, on top the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations
+    k8s_affinity = k8s.V1Affinity(
+        pod_anti_affinity=k8s.V1PodAntiAffinity(
+            required_during_scheduling_ignored_during_execution=[
+                k8s.V1PodAffinityTerm(
+                    label_selector=k8s.V1LabelSelector(
+                        match_expressions=[
+                            k8s.V1LabelSelectorRequirement(key='app', operator='In', values=['airflow'])
+                        ]
+                    ),
+                    topology_key='kubernetes.io/hostname',
+                )
             ]
-        }
-    }
+        )
+    )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}]
+    # Use k8s_client.V1Toleration to define node tolerations
+    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
-        }
+    # Use k8s_client.V1ResourceRequirements to define resource limits
+    k8s_resource_limits = k8s.V1ResourceRequirements(requests={'memory': '128Mi'}, limits={'memory': '128Mi'})
+    # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, and resource limits
+    k8s_exec_config_resources = {

Review comment:
       We should change this name, as it's not just resources.
   
   Also, I think we should reorder these examples. I have to imagine resources are used significantly more than custom worker images (which probably should be the last example).

##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/pod_template_file.yaml"),

Review comment:
       This file doesn't exist by default, it just so happens to be where the official chart puts it. We also couldn't add a custom pod_template_file that'll work in all instances.
   
   I think we should move this example into the docs so we don't actually try and run it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org