You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/19 20:28:55 UTC

[GitHub] [airflow] kaxil opened a new pull request #12494: Make kubernetes requirement optional for Example DAGs

kaxil opened a new pull request #12494:
URL: https://github.com/apache/airflow/pull/12494


   
   ![image](https://user-images.githubusercontent.com/8811558/99720551-cf404c00-2aa5-11eb-8617-75ae65749db8.png)
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12494: Make kubernetes requirement optional for Example DAGs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12494:
URL: https://github.com/apache/airflow/pull/12494#issuecomment-730635502


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12494: Make kubernetes requirement optional for Example DAGs

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12494:
URL: https://github.com/apache/airflow/pull/12494#discussion_r527194194



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -31,130 +30,146 @@
     'owner': 'airflow',
 }
 
-with DAG(
-    dag_id='example_kubernetes_executor_config',
-    default_args=default_args,
-    schedule_interval=None,
-    start_date=days_ago(2),
-    tags=['example3'],
-) as dag:
+log = logging.getLogger(__name__)
+
+try:
+    from kubernetes.client import models as k8s
+
+    with DAG(
+        dag_id='example_kubernetes_executor_config',
+        default_args=default_args,
+        schedule_interval=None,
+        start_date=days_ago(2),
+        tags=['example3'],
+    ) as dag:
 
-    def test_sharedvolume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        for i in range(5):
-            try:
-                return_code = os.system("cat /shared/test.txt")
-                if return_code != 0:
-                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-            except ValueError as e:
-                if i > 4:
-                    raise e
+        def test_sharedvolume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            for i in range(5):
+                try:
+                    return_code = os.system("cat /shared/test.txt")
+                    if return_code != 0:
+                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+                except ValueError as e:
+                    if i > 4:
+                        raise e
 
-    def test_volume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        with open('/foo/volume_mount_test.txt', 'w') as foo:
-            foo.write('Hello')
+        def test_volume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            with open('/foo/volume_mount_test.txt', 'w') as foo:
+                foo.write('Hello')
 
-        return_code = os.system("cat /foo/volume_mount_test.txt")
-        if return_code != 0:
-            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+            return_code = os.system("cat /foo/volume_mount_test.txt")
+            if return_code != 0:
+                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
 
-    # You can use annotations on your kubernetes pods!
-    start_task = PythonOperator(
-        task_id="start_task",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        },
-    )
+        # You can use annotations on your kubernetes pods!
+        start_task = PythonOperator(
+            task_id="start_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+            },
+        )
 
-    # [START task_with_volume]
-    volume_task = PythonOperator(
-        task_id="task_with_volume",
-        python_callable=test_volume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_volume]
+        # [START task_with_volume]
+        volume_task = PythonOperator(
+            task_id="task_with_volume",
+            python_callable=test_volume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(
+                                        mount_path="/foo/", name="example-kubernetes-test-volume"
+                                    )
+                                ],
+                            )
+                        ],
+                        volumes=[
+                            k8s.V1Volume(
+                                name="example-kubernetes-test-volume",
+                                host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                            )
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_volume]
 
-    # [START task_with_template]
-    task_with_template = PythonOperator(
-        task_id="task_with_template",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        },
-    )
-    # [END task_with_template]
+        # [START task_with_template]
+        task_with_template = PythonOperator(
+            task_id="task_with_template",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+            },
+        )
+        # [END task_with_template]
 
-    # [START task_with_sidecar]
-    sidecar_task = PythonOperator(
-        task_id="task_with_sidecar",
-        python_callable=test_sharedvolume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_sidecar]
+        # [START task_with_sidecar]
+        sidecar_task = PythonOperator(
+            task_id="task_with_sidecar",
+            python_callable=test_sharedvolume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                            k8s.V1Container(
+                                name="sidecar",
+                                image="ubuntu",
+                                args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                                command=["bash", "-cx"],
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                        ],
+                        volumes=[
+                            k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_sidecar]
 
-    # Test that we can add labels to pods
-    third_task = PythonOperator(
-        task_id="non_root_task",
-        python_callable=print_stuff,
-        executor_config={"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))},
-    )
+        # Test that we can add labels to pods
+        third_task = PythonOperator(
+            task_id="non_root_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+            },
+        )
 
-    other_ns_task = PythonOperator(
-        task_id="other_namespace_task",
-        python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        },
-    )
+        other_ns_task = PythonOperator(
+            task_id="other_namespace_task",
+            python_callable=print_stuff,
+            executor_config={
+                "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
+            },
+        )
 
-    start_task >> volume_task >> third_task
-    start_task >> other_ns_task
-    start_task >> sidecar_task
-    start_task >> task_with_template
+        start_task >> volume_task >> third_task
+        start_task >> other_ns_task
+        start_task >> sidecar_task
+        start_task >> task_with_template
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['kubernetes']")

Review comment:
       Shall we use `cncf.kubernetes` instead? Given `kubernetes` is planned to be deprecated in 2.1 
   https://github.com/sekikn/airflow/blob/41cf172d6af1a5406367e179df763ef1992e4108/setup.py#L604




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #12494: Make kubernetes requirement optional for Example DAGs

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #12494:
URL: https://github.com/apache/airflow/pull/12494


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12494: Make kubernetes requirement optional for Example DAGs

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12494:
URL: https://github.com/apache/airflow/pull/12494#discussion_r527194194



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -31,130 +30,146 @@
     'owner': 'airflow',
 }
 
-with DAG(
-    dag_id='example_kubernetes_executor_config',
-    default_args=default_args,
-    schedule_interval=None,
-    start_date=days_ago(2),
-    tags=['example3'],
-) as dag:
+log = logging.getLogger(__name__)
+
+try:
+    from kubernetes.client import models as k8s
+
+    with DAG(
+        dag_id='example_kubernetes_executor_config',
+        default_args=default_args,
+        schedule_interval=None,
+        start_date=days_ago(2),
+        tags=['example3'],
+    ) as dag:
 
-    def test_sharedvolume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        for i in range(5):
-            try:
-                return_code = os.system("cat /shared/test.txt")
-                if return_code != 0:
-                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-            except ValueError as e:
-                if i > 4:
-                    raise e
+        def test_sharedvolume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            for i in range(5):
+                try:
+                    return_code = os.system("cat /shared/test.txt")
+                    if return_code != 0:
+                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+                except ValueError as e:
+                    if i > 4:
+                        raise e
 
-    def test_volume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        with open('/foo/volume_mount_test.txt', 'w') as foo:
-            foo.write('Hello')
+        def test_volume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            with open('/foo/volume_mount_test.txt', 'w') as foo:
+                foo.write('Hello')
 
-        return_code = os.system("cat /foo/volume_mount_test.txt")
-        if return_code != 0:
-            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+            return_code = os.system("cat /foo/volume_mount_test.txt")
+            if return_code != 0:
+                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
 
-    # You can use annotations on your kubernetes pods!
-    start_task = PythonOperator(
-        task_id="start_task",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        },
-    )
+        # You can use annotations on your kubernetes pods!
+        start_task = PythonOperator(
+            task_id="start_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+            },
+        )
 
-    # [START task_with_volume]
-    volume_task = PythonOperator(
-        task_id="task_with_volume",
-        python_callable=test_volume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_volume]
+        # [START task_with_volume]
+        volume_task = PythonOperator(
+            task_id="task_with_volume",
+            python_callable=test_volume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(
+                                        mount_path="/foo/", name="example-kubernetes-test-volume"
+                                    )
+                                ],
+                            )
+                        ],
+                        volumes=[
+                            k8s.V1Volume(
+                                name="example-kubernetes-test-volume",
+                                host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                            )
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_volume]
 
-    # [START task_with_template]
-    task_with_template = PythonOperator(
-        task_id="task_with_template",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        },
-    )
-    # [END task_with_template]
+        # [START task_with_template]
+        task_with_template = PythonOperator(
+            task_id="task_with_template",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+            },
+        )
+        # [END task_with_template]
 
-    # [START task_with_sidecar]
-    sidecar_task = PythonOperator(
-        task_id="task_with_sidecar",
-        python_callable=test_sharedvolume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_sidecar]
+        # [START task_with_sidecar]
+        sidecar_task = PythonOperator(
+            task_id="task_with_sidecar",
+            python_callable=test_sharedvolume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                            k8s.V1Container(
+                                name="sidecar",
+                                image="ubuntu",
+                                args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                                command=["bash", "-cx"],
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                        ],
+                        volumes=[
+                            k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_sidecar]
 
-    # Test that we can add labels to pods
-    third_task = PythonOperator(
-        task_id="non_root_task",
-        python_callable=print_stuff,
-        executor_config={"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))},
-    )
+        # Test that we can add labels to pods
+        third_task = PythonOperator(
+            task_id="non_root_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+            },
+        )
 
-    other_ns_task = PythonOperator(
-        task_id="other_namespace_task",
-        python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        },
-    )
+        other_ns_task = PythonOperator(
+            task_id="other_namespace_task",
+            python_callable=print_stuff,
+            executor_config={
+                "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
+            },
+        )
 
-    start_task >> volume_task >> third_task
-    start_task >> other_ns_task
-    start_task >> sidecar_task
-    start_task >> task_with_template
+        start_task >> volume_task >> third_task
+        start_task >> other_ns_task
+        start_task >> sidecar_task
+        start_task >> task_with_template
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['kubernetes']")

Review comment:
       Shall we use `cncf.kubernetes` instead? Given `kubernetes` is planned to be deprecated in 2.1 
   https://github.com/apache/airflow/blob/41cf172d6af1a5406367e179df763ef1992e4108/setup.py#L604




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12494: Make kubernetes requirement optional for Example DAGs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12494:
URL: https://github.com/apache/airflow/pull/12494#discussion_r527195807



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -31,130 +30,146 @@
     'owner': 'airflow',
 }
 
-with DAG(
-    dag_id='example_kubernetes_executor_config',
-    default_args=default_args,
-    schedule_interval=None,
-    start_date=days_ago(2),
-    tags=['example3'],
-) as dag:
+log = logging.getLogger(__name__)
+
+try:
+    from kubernetes.client import models as k8s
+
+    with DAG(
+        dag_id='example_kubernetes_executor_config',
+        default_args=default_args,
+        schedule_interval=None,
+        start_date=days_ago(2),
+        tags=['example3'],
+    ) as dag:
 
-    def test_sharedvolume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        for i in range(5):
-            try:
-                return_code = os.system("cat /shared/test.txt")
-                if return_code != 0:
-                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-            except ValueError as e:
-                if i > 4:
-                    raise e
+        def test_sharedvolume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            for i in range(5):
+                try:
+                    return_code = os.system("cat /shared/test.txt")
+                    if return_code != 0:
+                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+                except ValueError as e:
+                    if i > 4:
+                        raise e
 
-    def test_volume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        with open('/foo/volume_mount_test.txt', 'w') as foo:
-            foo.write('Hello')
+        def test_volume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            with open('/foo/volume_mount_test.txt', 'w') as foo:
+                foo.write('Hello')
 
-        return_code = os.system("cat /foo/volume_mount_test.txt")
-        if return_code != 0:
-            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+            return_code = os.system("cat /foo/volume_mount_test.txt")
+            if return_code != 0:
+                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
 
-    # You can use annotations on your kubernetes pods!
-    start_task = PythonOperator(
-        task_id="start_task",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        },
-    )
+        # You can use annotations on your kubernetes pods!
+        start_task = PythonOperator(
+            task_id="start_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+            },
+        )
 
-    # [START task_with_volume]
-    volume_task = PythonOperator(
-        task_id="task_with_volume",
-        python_callable=test_volume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_volume]
+        # [START task_with_volume]
+        volume_task = PythonOperator(
+            task_id="task_with_volume",
+            python_callable=test_volume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(
+                                        mount_path="/foo/", name="example-kubernetes-test-volume"
+                                    )
+                                ],
+                            )
+                        ],
+                        volumes=[
+                            k8s.V1Volume(
+                                name="example-kubernetes-test-volume",
+                                host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                            )
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_volume]
 
-    # [START task_with_template]
-    task_with_template = PythonOperator(
-        task_id="task_with_template",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        },
-    )
-    # [END task_with_template]
+        # [START task_with_template]
+        task_with_template = PythonOperator(
+            task_id="task_with_template",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+            },
+        )
+        # [END task_with_template]
 
-    # [START task_with_sidecar]
-    sidecar_task = PythonOperator(
-        task_id="task_with_sidecar",
-        python_callable=test_sharedvolume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_sidecar]
+        # [START task_with_sidecar]
+        sidecar_task = PythonOperator(
+            task_id="task_with_sidecar",
+            python_callable=test_sharedvolume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                            k8s.V1Container(
+                                name="sidecar",
+                                image="ubuntu",
+                                args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                                command=["bash", "-cx"],
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                        ],
+                        volumes=[
+                            k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_sidecar]
 
-    # Test that we can add labels to pods
-    third_task = PythonOperator(
-        task_id="non_root_task",
-        python_callable=print_stuff,
-        executor_config={"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))},
-    )
+        # Test that we can add labels to pods
+        third_task = PythonOperator(
+            task_id="non_root_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+            },
+        )
 
-    other_ns_task = PythonOperator(
-        task_id="other_namespace_task",
-        python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        },
-    )
+        other_ns_task = PythonOperator(
+            task_id="other_namespace_task",
+            python_callable=print_stuff,
+            executor_config={
+                "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
+            },
+        )
 
-    start_task >> volume_task >> third_task
-    start_task >> other_ns_task
-    start_task >> sidecar_task
-    start_task >> task_with_template
+        start_task >> volume_task >> third_task
+        start_task >> other_ns_task
+        start_task >> sidecar_task
+        start_task >> task_with_template
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['kubernetes']")

Review comment:
       fixed in a73dcb846




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12494: Make kubernetes requirement optional for Example DAGs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12494:
URL: https://github.com/apache/airflow/pull/12494#issuecomment-730628834


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #12494: Make kubernetes requirement optional for Example DAGs

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #12494:
URL: https://github.com/apache/airflow/pull/12494#discussion_r527194933



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -31,130 +30,146 @@
     'owner': 'airflow',
 }
 
-with DAG(
-    dag_id='example_kubernetes_executor_config',
-    default_args=default_args,
-    schedule_interval=None,
-    start_date=days_ago(2),
-    tags=['example3'],
-) as dag:
+log = logging.getLogger(__name__)
+
+try:
+    from kubernetes.client import models as k8s
+
+    with DAG(
+        dag_id='example_kubernetes_executor_config',
+        default_args=default_args,
+        schedule_interval=None,
+        start_date=days_ago(2),
+        tags=['example3'],
+    ) as dag:
 
-    def test_sharedvolume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        for i in range(5):
-            try:
-                return_code = os.system("cat /shared/test.txt")
-                if return_code != 0:
-                    raise ValueError(f"Error when checking volume mount. Return code {return_code}")
-            except ValueError as e:
-                if i > 4:
-                    raise e
+        def test_sharedvolume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            for i in range(5):
+                try:
+                    return_code = os.system("cat /shared/test.txt")
+                    if return_code != 0:
+                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+                except ValueError as e:
+                    if i > 4:
+                        raise e
 
-    def test_volume_mount():
-        """
-        Tests whether the volume has been mounted.
-        """
-        with open('/foo/volume_mount_test.txt', 'w') as foo:
-            foo.write('Hello')
+        def test_volume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            with open('/foo/volume_mount_test.txt', 'w') as foo:
+                foo.write('Hello')
 
-        return_code = os.system("cat /foo/volume_mount_test.txt")
-        if return_code != 0:
-            raise ValueError(f"Error when checking volume mount. Return code {return_code}")
+            return_code = os.system("cat /foo/volume_mount_test.txt")
+            if return_code != 0:
+                raise ValueError(f"Error when checking volume mount. Return code {return_code}")
 
-    # You can use annotations on your kubernetes pods!
-    start_task = PythonOperator(
-        task_id="start_task",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        },
-    )
+        # You can use annotations on your kubernetes pods!
+        start_task = PythonOperator(
+            task_id="start_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+            },
+        )
 
-    # [START task_with_volume]
-    volume_task = PythonOperator(
-        task_id="task_with_volume",
-        python_callable=test_volume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_volume]
+        # [START task_with_volume]
+        volume_task = PythonOperator(
+            task_id="task_with_volume",
+            python_callable=test_volume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(
+                                        mount_path="/foo/", name="example-kubernetes-test-volume"
+                                    )
+                                ],
+                            )
+                        ],
+                        volumes=[
+                            k8s.V1Volume(
+                                name="example-kubernetes-test-volume",
+                                host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                            )
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_volume]
 
-    # [START task_with_template]
-    task_with_template = PythonOperator(
-        task_id="task_with_template",
-        python_callable=print_stuff,
-        executor_config={
-            "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
-            "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        },
-    )
-    # [END task_with_template]
+        # [START task_with_template]
+        task_with_template = PythonOperator(
+            task_id="task_with_template",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+            },
+        )
+        # [END task_with_template]
 
-    # [START task_with_sidecar]
-    sidecar_task = PythonOperator(
-        task_id="task_with_sidecar",
-        python_callable=test_sharedvolume_mount,
-        executor_config={
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
-                            command=["bash", "-cx"],
-                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        },
-    )
-    # [END task_with_sidecar]
+        # [START task_with_sidecar]
+        sidecar_task = PythonOperator(
+            task_id="task_with_sidecar",
+            python_callable=test_sharedvolume_mount,
+            executor_config={
+                "pod_override": k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                            k8s.V1Container(
+                                name="sidecar",
+                                image="ubuntu",
+                                args=["echo \"retrieved from mount\" > /shared/test.txt"],
+                                command=["bash", "-cx"],
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
+                                ],
+                            ),
+                        ],
+                        volumes=[
+                            k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
+                        ],
+                    )
+                ),
+            },
+        )
+        # [END task_with_sidecar]
 
-    # Test that we can add labels to pods
-    third_task = PythonOperator(
-        task_id="non_root_task",
-        python_callable=print_stuff,
-        executor_config={"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))},
-    )
+        # Test that we can add labels to pods
+        third_task = PythonOperator(
+            task_id="non_root_task",
+            python_callable=print_stuff,
+            executor_config={
+                "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+            },
+        )
 
-    other_ns_task = PythonOperator(
-        task_id="other_namespace_task",
-        python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
-        },
-    )
+        other_ns_task = PythonOperator(
+            task_id="other_namespace_task",
+            python_callable=print_stuff,
+            executor_config={
+                "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
+            },
+        )
 
-    start_task >> volume_task >> third_task
-    start_task >> other_ns_task
-    start_task >> sidecar_task
-    start_task >> task_with_template
+        start_task >> volume_task >> third_task
+        start_task >> other_ns_task
+        start_task >> sidecar_task
+        start_task >> task_with_template
+except ImportError as e:
+    log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
+    log.warning("Install kubernetes dependencies with: pip install apache-airflow['kubernetes']")

Review comment:
       Yep. This is the official one. and it is in the docs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org