You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/19 00:40:54 UTC

[GitHub] [airflow] dimberman opened a new pull request #10393: Simplify the K8sExecutor and K8sPodOperator

dimberman opened a new pull request #10393:
URL: https://github.com/apache/airflow/pull/10393


   As discussed in an earlier email thread, this PR removes much of the configuration elements of the K8sExecutor for a much simpler design.
   
   `<!--`
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r480125232



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -65,11 +60,11 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         The docker image's CMD is used if this is not provided.
     :type arguments: list[str]
     :param ports: ports for launched pod.
-    :type ports: list[airflow.kubernetes.pod.Port]
+    :type ports: list[k8s.V1ContainerPort]

Review comment:
       I would be happy if there was an entry with an example for each parameter that has changed. We cannot assume that the user will know these two classes and will be able to migrate on their own.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] davlum commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
davlum commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-692125069


   Pod override would do exactly that, it would completely override the base definition, not merge with it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477522918



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -365,43 +351,53 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
                 }
             )
             self.labels.update(labels)
-        pod = pod_generator.PodGenerator(
-            image=self.image,
-            namespace=self.namespace,
-            cmds=self.cmds,
-            args=self.arguments,
-            labels=self.labels,
-            name=self.name,
-            envs=self.env_vars,
-            extract_xcom=self.do_xcom_push,
-            image_pull_policy=self.image_pull_policy,
-            node_selectors=self.node_selectors,
-            annotations=self.annotations,
-            affinity=self.affinity,
-            image_pull_secrets=self.image_pull_secrets,
-            service_account_name=self.service_account_name,
-            hostnetwork=self.hostnetwork,
-            tolerations=self.tolerations,
-            configmaps=self.configmaps,
-            security_context=self.security_context,
-            dnspolicy=self.dnspolicy,
-            schedulername=self.schedulername,
-            init_containers=self.init_containers,
-            restart_policy='Never',
-            priority_class_name=self.priority_class_name,
-            pod_template_file=self.pod_template_file,
-            pod=self.full_pod_spec,
-        ).gen_pod()
-
-        pod = append_to_pod(
-            pod,
-            self.pod_runtime_info_envs +
-            self.ports +  # type: ignore
-            self.resources +
-            self.secrets +  # type: ignore
-            self.volumes +  # type: ignore
-            self.volume_mounts  # type: ignore
-        )
+        if self.pod_template_file:
+            pod = pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)
+        else:
+            pod = k8s.V1Pod(
+                api_version="v1",
+                kind="Pod",
+                metadata=k8s.V1ObjectMeta(
+                    namespace=self.namespace,
+                    labels=self.labels,
+                    name=self.name,
+                    annotations=self.annotations,
+
+                ),
+                spec=k8s.V1PodSpec(
+                    node_selector=self.node_selectors,
+                    affinity=self.affinity,
+                    tolerations=self.tolerations,
+                    init_containers=self.init_containers,
+                    containers=[
+                        k8s.V1Container(
+                            image=self.image,
+                            name="base",
+                            command=self.cmds,
+                            ports=self.ports,
+                            resources=self.k8s_resources,
+                            volume_mounts=self.volume_mounts,
+                            args=self.arguments,
+                            env=self.env_vars,
+                            env_from=self.env_from,
+                        )
+                    ],
+                    image_pull_secrets=self.image_pull_secrets,
+                    service_account_name=self.service_account_name,
+                    host_network=self.hostnetwork,
+                    security_context=self.security_context,
+                    dns_policy=self.dnspolicy,
+                    scheduler_name=self.schedulername,
+                    restart_policy='Never',
+                    priority_class_name=self.priority_class_name,
+                    volumes=self.volumes,
+                )
+            )
+        for secret in self.secrets:
+            pod = secret.attach_to_pod(pod)
+        if self.do_xcom_push:
+            from airflow.kubernetes.pod_generator import PodGenerator
+            pod = PodGenerator.add_xcom_sidecar(pod)

Review comment:
       ```suggestion
               pod = pod_generator.PodGenerator.add_xcom_sidecar(pod)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r489497070



##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations

Review comment:
       @mik-laj added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r486521086



##########
File path: chart/values.yaml
##########
@@ -541,32 +543,11 @@ config:
     namespace: '{{ .Release.Namespace }}'
     airflow_configmap: '{{ include "airflow_config" . }}'
     airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
+    pod_template_file: '{{ include "airflow_pod_template_file" . }}/{{ .Values.podTemplateFile }}'
     worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
     worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
     worker_container_image_pull_policy: '{{ .Values.images.airflow.pullPolicy }}'
-    worker_service_account_name: '{{ .Release.Name }}-worker-serviceaccount'
-    image_pull_secrets: '{{ template "registry_secret" . }}'
-    dags_in_image: '{{ ternary "False" "True" (or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled) }}'
     delete_worker_pods: 'True'
-    run_as_user: '{{ .Values.uid }}'
-    fs_group: '{{ .Values.gid }}'
-    git_dags_folder_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{end}}'
-    dags_volume_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{ end }}'
-    dags_volume_claim: '{{- if .Values.dags.persistence.enabled }}{{ include "airflow_dags_volume_claim" . }}{{ end }}'
-    dags_volume_subpath: '{{- if .Values.dags.persistence.enabled }}{{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }}{{ end }}'
-    git_repo: '{{- if and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled) }}{{ .Values.dags.gitSync.repo }}{{ end }}'
-    git_branch: '{{ .Values.dags.gitSync.branch }}'
-    git_sync_rev: '{{ .Values.dags.gitSync.rev }}'
-    git_sync_depth: '{{ .Values.dags.gitSync.depth }}'
-    git_sync_root: '{{ .Values.dags.gitSync.root }}'
-    git_sync_dest: '{{ .Values.dags.gitSync.dest }}'
-    git_sync_container_repository: '{{ .Values.dags.gitSync.containerRepository }}'
-    git_sync_container_tag: '{{ .Values.dags.gitSync.containerTag }}'
-    git_sync_init_container_name: '{{ .Values.dags.gitSync.containerName }}'
-    git_sync_run_as_user: '{{ .Values.uid }}'
-    git_ssh_known_hosts_configmap_name: '{{- if .Values.dags.gitSync.knownHosts }}{{ include "airflow_config" . }}{{ end }}'
-    git_ssh_key_secret_name: '{{- if .Values.dags.gitSync.sshKeySecret }}{{ .Values.dags.gitSync.sshKeySecret }}{{ end }}'
-    git_sync_credentials_secret: '{{- if .Values.dags.gitSync.credentialsSecret }}{{ .Values.dags.gitSync.credentialsSecret }}{{ end }}'

Review comment:
       This section can go, yeah -- just using this as a place to comment on.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r472916901



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -175,7 +175,7 @@ def __init__(self,  # pylint: disable=too-many-arguments,too-many-locals
                  affinity: Optional[Dict] = None,
                  config_file: Optional[str] = None,
                  node_selectors: Optional[Dict] = None,
-                 image_pull_secrets: Optional[str] = None,
+                 image_pull_secrets: List[k8s.V1LocalObjectReference] = None,

Review comment:
       ```suggestion
                    image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r481271072



##########
File path: airflow/utils/json.py
##########
@@ -20,6 +20,7 @@
 from datetime import date, datetime
 
 import numpy as np
+from kubernetes.client import models as k8s

Review comment:
       ```suggestion
   try:
       from kubernetes.client import models as k8s
   except ImportError:
       k8s = None
   ```

##########
File path: airflow/utils/json.py
##########
@@ -50,5 +51,8 @@ def _default(obj):
         elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64,
                               np.complex_, np.complex64, np.complex128)):
             return float(obj)
+        elif isinstance(obj, k8s.V1Pod):

Review comment:
       ```suggestion
           elif k8s is not None and isinstance(obj, k8s.V1Pod):
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r489372811



##########
File path: docs/howto/operator/kubernetes.rst
##########
@@ -53,13 +53,10 @@ Ultimately, it allows Airflow to act a job orchestrator - no matter the language
 
 How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-Kubernetes cluster resources such as ConfigMaps, Secrets, and Volumes can be used with a Pod to be launched.
+Kubernetes cluster resources such as Secrets can be used with a Pod to be launched.
 Utilize the Airflow Kubernetes model classes such as:

Review comment:
       This sentence needs to be corrected. The plural is used when one class is described.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477530455



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       Let me try again. Maybe I'm missing something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478504368



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       @kaxil yeah but forcing using to use the to_dict function seems kinda clunky, no?
   
   We store the executor_config in the DB but TBH we don't really need to. Maybe we can just create new fields "kubernetes_executor_config" and "dag_serialization_config" and just remove the "executor_config" as a whole?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-683928708


   @potiuk yes This is breaking for the KubernetesPodOperator so we should hold off. I'm hoping to have this merged in the next day or two. Can I please get a lookover? :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r484527338



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,59 +53,68 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"}
-            }
+        executor_config={"podOverride": k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                annotations={"test": "annotation"}
+            )
+        )
         }
     )
 
     # You can mount volume or secret to the worker pod
     second_task = PythonOperator(
         task_id="four_task",
         python_callable=test_volume_mount,
-        executor_config={
-            "KubernetesExecutor": {
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
+        executor_config={"podOverride": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(
+                                mount_path="/foo/",
+                                name="example-kubernetes-test-volume"
+                            )
+                        ]
+                    )
                 ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(
+                            path="/tmp/"
+                        )
+                    )
                 ]
-            }
+            )
+        )
         }
     )
 
     # Test that we can add labels to pods
     third_task = PythonOperator(
         task_id="non_root_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "labels": {
+        executor_config={"podOverride": k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                labels={
                     "release": "stable"
                 }
-            }
+            )
+        )
         }
     )
 
     other_ns_task = PythonOperator(
         task_id="other_namespace_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "namespace": "test-namespace",
-                "labels": {
-                    "release": "stable"
-                }
-            }
-        }
+        # executor_config={"podOverride": k8s.V1Pod(

Review comment:
       is the intention to remove this comments?

##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,59 +53,68 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"}
-            }
+        executor_config={"podOverride": k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                annotations={"test": "annotation"}
+            )
+        )
         }
     )
 
     # You can mount volume or secret to the worker pod
     second_task = PythonOperator(
         task_id="four_task",
         python_callable=test_volume_mount,
-        executor_config={
-            "KubernetesExecutor": {
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
+        executor_config={"podOverride": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(
+                                mount_path="/foo/",
+                                name="example-kubernetes-test-volume"
+                            )
+                        ]
+                    )
                 ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(
+                            path="/tmp/"
+                        )
+                    )
                 ]
-            }
+            )
+        )
         }
     )
 
     # Test that we can add labels to pods
     third_task = PythonOperator(
         task_id="non_root_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "labels": {
+        executor_config={"podOverride": k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                labels={
                     "release": "stable"
                 }
-            }
+            )
+        )
         }
     )
 
     other_ns_task = PythonOperator(
         task_id="other_namespace_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "namespace": "test-namespace",
-                "labels": {
-                    "release": "stable"
-                }
-            }
-        }
+        # executor_config={"podOverride": k8s.V1Pod(

Review comment:
       is the intention to remove these comments?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r472917647



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -365,44 +365,57 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
                 }
             )
             self.labels.update(labels)
-        pod = pod_generator.PodGenerator(
-            image=self.image,
-            namespace=self.namespace,
-            cmds=self.cmds,
-            args=self.arguments,
-            labels=self.labels,
-            name=self.name,
-            envs=self.env_vars,
-            extract_xcom=self.do_xcom_push,
-            image_pull_policy=self.image_pull_policy,
-            node_selectors=self.node_selectors,
-            annotations=self.annotations,
-            affinity=self.affinity,
-            image_pull_secrets=self.image_pull_secrets,
-            service_account_name=self.service_account_name,
-            hostnetwork=self.hostnetwork,
-            tolerations=self.tolerations,
-            configmaps=self.configmaps,
-            security_context=self.security_context,
-            dnspolicy=self.dnspolicy,
-            schedulername=self.schedulername,
-            init_containers=self.init_containers,
-            restart_policy='Never',
-            priority_class_name=self.priority_class_name,
-            pod_template_file=self.pod_template_file,
-            pod=self.full_pod_spec,
-        ).gen_pod()
+        from kubernetes.client import models as k8s

Review comment:
       Any particular reason to import this locally? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-693503615


   @mik-laj can I get another lookover? :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r489371534



##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The `airflow.cfg` will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The `executor_config` Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In Airflow 1.10.x, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,

Review comment:
       Do we have this deprecation warning? I didn't see it in the code, but I might have missed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r480264064



##########
File path: UPDATING.md
##########
@@ -153,6 +152,175 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In airflow 1.10, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,
+but will be removed in a future version.
+
+### Changes to the KubernetesPodOperator
+
+Much like the KubernetesExecutor, the KubernetesPodOperator will no longer take Airflow custom classes and will
+instead expect either a pod_template yaml file, or `kubernetes.client.models` objects.
+
+The one notable exception is that we will continue to support the `airflow.kubernetes.secret.Secret` class.
+
+Whereas previously a user would import each individual class to build the pod as so:
+
+```python
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.volume_mount import VolumeMount
+
+
+volume_config = {
+    'persistentVolumeClaim': {
+        'claimName': 'test-volume'
+    }
+}
+volume = Volume(name='test-volume', configs=volume_config)
+volume_mount = VolumeMount('test-volume',
+                           mount_path='/root/mount_file',
+                           sub_path=None,
+                           read_only=True)
+
+port = Port('http', 80)
+secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
+secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
+
+k = KubernetesPodOperator(
+    namespace='default',
+    image="ubuntu:16.04",
+    cmds=["bash", "-cx"],
+    arguments=["echo", "10"],
+    labels={"foo": "bar"},
+    secrets=[secret_file, secret_env],
+    ports=[port],
+    volumes=[volume],
+    volume_mounts=[volume_mount],
+    name="airflow-test-pod",
+    task_id="task",
+    affinity=affinity,
+    is_delete_operator_pod=True,
+    hostnetwork=False,
+    tolerations=tolerations,
+    configmaps=configmaps,
+    init_containers=[init_container],
+    priority_class_name="medium",
+)
+```
+Now the user can use the `kubernetes.client.models` class as a single point of entry for creating all k8s objects.
+
+```python
+from kubernetes.client import models as k8s
+from airflow.kubernetes.secret import Secret
+
+
+configmaps = ['test-configmap-1', 'test-configmap-2']
+
+volume = k8s.V1Volume(
+    name='test-volume',
+    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
+)
+
+port = k8s.V1ContainerPort(name='http', container_port=80)
+secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
+secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
+secret_all_keys = Secret('env', None, 'airflow-secrets-2')
+volume_mount = k8s.V1VolumeMount(
+    name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
+)
+
+k = KubernetesPodOperator(
+    namespace='default',
+    image="ubuntu:16.04",
+    cmds=["bash", "-cx"],
+    arguments=["echo", "10"],
+    labels={"foo": "bar"},
+    secrets=[secret_file, secret_env],
+    ports=[port],
+    volumes=[volume],
+    volume_mounts=[volume_mount],
+    name="airflow-test-pod",
+    task_id="task",
+    is_delete_operator_pod=True,
+    hostnetwork=False)
+```
+We decided to keep the Secret class as users seem to really like that simplifies the complexity of mounting
+Kubernetes secrets into workers.
+

Review comment:
       Added!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r489482962



##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The `airflow.cfg` will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The `executor_config` Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In Airflow 1.10.x, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,

Review comment:
       Hi @mik-laj, the deprecation warning was already merged so it didn't show up in the PR https://github.com/apache/airflow/pull/10393/files#diff-868ed785b2a336f20cb6a577dde2502aR198-R201




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r480121468



##########
File path: UPDATING.md
##########
@@ -153,6 +152,175 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In airflow 1.10, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,
+but will be removed in a future version.
+
+### Changes to the KubernetesPodOperator

Review comment:
       In my opinion, this tutorial will not allow easy migration between the old and the new representation. Many parameters are not described and the new representation will not always be obvious.  The user will have to read the version 1.10 code to understand what they should do now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r484527717



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -483,17 +326,23 @@ def run_next(self, next_job: KubernetesJobType) -> None:
         if command[0:3] != ["airflow", "tasks", "run"]:
             raise ValueError('The command must start with ["airflow", "tasks", "run"].')
 
+        base_worker_pod = PodGenerator.deserialize_model_file(self.kube_config.pod_template_file)
+        if not base_worker_pod:
+            raise AirflowException("could not find a valid worker template yaml at {}"
+                                   .format(self.kube_config.pod_template_file))
+
         pod = PodGenerator.construct_pod(
             namespace=self.namespace,
             worker_uuid=self.worker_uuid,
             pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=dag_id,
             task_id=task_id,
+            kube_image=self.kube_config.kube_image,
             try_number=try_number,
             date=execution_date,
             command=command,
-            kube_executor_config=kube_executor_config,
-            worker_config=self.worker_configuration_pod
+            kube_mutation_object=kube_executor_config,
+            base_worker_pod=base_worker_pod

Review comment:
       wherever this signature was changed, if we haven't already have it in Updating.md we should add it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478507219



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       No, I meant, adding code in serialization module, https://github.com/apache/airflow/blob/e565368f2e988a06b0398b77e66859c138905ae7/airflow/serialization/serialized_objects.py#L159-L216
   
   that if the object is k8s.V1Pod, store it as `k8s.V1Pod.to_dict()` . 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r486435798



##########
File path: chart/values.yaml
##########
@@ -541,32 +543,11 @@ config:
     namespace: '{{ .Release.Namespace }}'
     airflow_configmap: '{{ include "airflow_config" . }}'
     airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
+    pod_template_file: '{{ include "airflow_pod_template_file" . }}/{{ .Values.podTemplateFile }}'
     worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
     worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
     worker_container_image_pull_policy: '{{ .Values.images.airflow.pullPolicy }}'
-    worker_service_account_name: '{{ .Release.Name }}-worker-serviceaccount'
-    image_pull_secrets: '{{ template "registry_secret" . }}'
-    dags_in_image: '{{ ternary "False" "True" (or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled) }}'
     delete_worker_pods: 'True'
-    run_as_user: '{{ .Values.uid }}'
-    fs_group: '{{ .Values.gid }}'
-    git_dags_folder_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{end}}'
-    dags_volume_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{ end }}'
-    dags_volume_claim: '{{- if .Values.dags.persistence.enabled }}{{ include "airflow_dags_volume_claim" . }}{{ end }}'
-    dags_volume_subpath: '{{- if .Values.dags.persistence.enabled }}{{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }}{{ end }}'
-    git_repo: '{{- if and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled) }}{{ .Values.dags.gitSync.repo }}{{ end }}'
-    git_branch: '{{ .Values.dags.gitSync.branch }}'
-    git_sync_rev: '{{ .Values.dags.gitSync.rev }}'
-    git_sync_depth: '{{ .Values.dags.gitSync.depth }}'
-    git_sync_root: '{{ .Values.dags.gitSync.root }}'
-    git_sync_dest: '{{ .Values.dags.gitSync.dest }}'
-    git_sync_container_repository: '{{ .Values.dags.gitSync.containerRepository }}'
-    git_sync_container_tag: '{{ .Values.dags.gitSync.containerTag }}'
-    git_sync_init_container_name: '{{ .Values.dags.gitSync.containerName }}'
-    git_sync_run_as_user: '{{ .Values.uid }}'
-    git_ssh_known_hosts_configmap_name: '{{- if .Values.dags.gitSync.knownHosts }}{{ include "airflow_config" . }}{{ end }}'
-    git_ssh_key_secret_name: '{{- if .Values.dags.gitSync.sshKeySecret }}{{ .Values.dags.gitSync.sshKeySecret }}{{ end }}'
-    git_sync_credentials_secret: '{{- if .Values.dags.gitSync.credentialsSecret }}{{ .Values.dags.gitSync.credentialsSecret }}{{ end }}'

Review comment:
       @ashb OOF good catch thank you




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r490276351



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -396,7 +240,7 @@ def from_legacy_obj(obj) -> Optional[k8s.V1Pod]:
                     limits=limits
                 )
         namespaced['resources'] = resources
-        return PodGenerator(**namespaced).gen_pod()
+        return PodGeneratorDeprecated(**namespaced).gen_pod()

Review comment:
       Yep :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r489439653



##########
File path: docs/howto/operator/kubernetes.rst
##########
@@ -53,13 +53,10 @@ Ultimately, it allows Airflow to act a job orchestrator - no matter the language
 
 How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-Kubernetes cluster resources such as ConfigMaps, Secrets, and Volumes can be used with a Pod to be launched.
+Kubernetes cluster resources such as Secrets can be used with a Pod to be launched.
 Utilize the Airflow Kubernetes model classes such as:

Review comment:
       Do we still support standard python dicts? (Line 60)
   
   Also: why did this bit need to change -- should `Volume`  or `VolumeMount` not be used anymore?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r484537691



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -483,17 +326,23 @@ def run_next(self, next_job: KubernetesJobType) -> None:
         if command[0:3] != ["airflow", "tasks", "run"]:
             raise ValueError('The command must start with ["airflow", "tasks", "run"].')
 
+        base_worker_pod = PodGenerator.deserialize_model_file(self.kube_config.pod_template_file)
+        if not base_worker_pod:
+            raise AirflowException("could not find a valid worker template yaml at {}"
+                                   .format(self.kube_config.pod_template_file))
+
         pod = PodGenerator.construct_pod(
             namespace=self.namespace,
             worker_uuid=self.worker_uuid,
             pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=dag_id,
             task_id=task_id,
+            kube_image=self.kube_config.kube_image,
             try_number=try_number,
             date=execution_date,
             command=command,
-            kube_executor_config=kube_executor_config,
-            worker_config=self.worker_configuration_pod
+            kube_mutation_object=kube_executor_config,
+            base_worker_pod=base_worker_pod

Review comment:
       @kaxil do we need to note this if it's not an external class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477517592



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -70,116 +69,25 @@ class KubeConfig:  # pylint: disable=too-many-instance-attributes
     def __init__(self):  # pylint: disable=too-many-statements
         configuration_dict = conf.as_dict(display_sensitive=True)
         self.core_configuration = configuration_dict['core']
-        self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
-        self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {})
-        self.env_from_configmap_ref = conf.get(self.kubernetes_section,
-                                               'env_from_configmap_ref')
-        self.env_from_secret_ref = conf.get(self.kubernetes_section,
-                                            'env_from_secret_ref')
         self.airflow_home = settings.AIRFLOW_HOME
         self.dags_folder = conf.get(self.core_section, 'dags_folder')
         self.parallelism = conf.getint(self.core_section, 'parallelism')
-        self.worker_container_repository = conf.get(
-            self.kubernetes_section, 'worker_container_repository')
-        self.worker_container_tag = conf.get(
-            self.kubernetes_section, 'worker_container_tag')
-        self.kube_image = '{}:{}'.format(
-            self.worker_container_repository, self.worker_container_tag)
-        self.kube_image_pull_policy = conf.get(
-            self.kubernetes_section, "worker_container_image_pull_policy"
-        )
-        self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {})
         self.pod_template_file = conf.get(self.kubernetes_section, 'pod_template_file',
                                           fallback=None)
 
-        kube_worker_annotations = conf.get(self.kubernetes_section, 'worker_annotations')
-        if kube_worker_annotations:
-            self.kube_annotations = json.loads(kube_worker_annotations)
-        else:
-            self.kube_annotations = None
-
-        self.kube_labels = configuration_dict.get('kubernetes_labels', {})
         self.delete_worker_pods = conf.getboolean(
             self.kubernetes_section, 'delete_worker_pods')
         self.delete_worker_pods_on_failure = conf.getboolean(
             self.kubernetes_section, 'delete_worker_pods_on_failure')
         self.worker_pods_creation_batch_size = conf.getint(
             self.kubernetes_section, 'worker_pods_creation_batch_size')
-        self.worker_service_account_name = conf.get(
-            self.kubernetes_section, 'worker_service_account_name')
-        self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')
-
-        # NOTE: user can build the dags into the docker image directly,
-        # this will set to True if so
-        self.dags_in_image = conf.getboolean(self.kubernetes_section, 'dags_in_image')
 
-        # Run as user for pod security context
-        self.worker_run_as_user = self._get_security_context_val('run_as_user')
-        self.worker_fs_group = self._get_security_context_val('fs_group')
-
-        kube_worker_resources = conf.get(self.kubernetes_section, 'worker_resources')
-        if kube_worker_resources:
-            self.worker_resources = json.loads(kube_worker_resources)
-        else:
-            self.worker_resources = None
-
-        # NOTE: `git_repo` and `git_branch` must be specified together as a pair
-        # The http URL of the git repository to clone from
-        self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
-        # The branch of the repository to be checked out
-        self.git_branch = conf.get(self.kubernetes_section, 'git_branch')
-        # Clone depth for git sync
-        self.git_sync_depth = conf.get(self.kubernetes_section, 'git_sync_depth')
-        # Optionally, the directory in the git repository containing the dags
-        self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath')
-        # Optionally, the root directory for git operations
-        self.git_sync_root = conf.get(self.kubernetes_section, 'git_sync_root')
-        # Optionally, the name at which to publish the checked-out files under --root
-        self.git_sync_dest = conf.get(self.kubernetes_section, 'git_sync_dest')
-        # Optionally, the tag or hash to checkout
-        self.git_sync_rev = conf.get(self.kubernetes_section, 'git_sync_rev')
-        # Optionally, if git_dags_folder_mount_point is set the worker will use
-        # {git_dags_folder_mount_point}/{git_sync_dest}/{git_subpath} as dags_folder
-        self.git_dags_folder_mount_point = conf.get(self.kubernetes_section,
-                                                    'git_dags_folder_mount_point')
-
-        # Optionally a user may supply a (`git_user` AND `git_password`) OR
-        # (`git_ssh_key_secret_name` AND `git_ssh_key_secret_key`) for private repositories
-        self.git_user = conf.get(self.kubernetes_section, 'git_user')
-        self.git_password = conf.get(self.kubernetes_section, 'git_password')
-        self.git_ssh_key_secret_name = conf.get(self.kubernetes_section, 'git_ssh_key_secret_name')
-        self.git_ssh_known_hosts_configmap_name = conf.get(self.kubernetes_section,
-                                                           'git_ssh_known_hosts_configmap_name')
-        self.git_sync_credentials_secret = conf.get(self.kubernetes_section,
-                                                    'git_sync_credentials_secret')
-
-        # NOTE: The user may optionally use a volume claim to mount a PV containing
-        # DAGs directly
-        self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')
-
-        self.dags_volume_mount_point = conf.get(self.kubernetes_section, 'dags_volume_mount_point')
-
-        # This prop may optionally be set for PV Claims and is used to write logs
-        self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')
-
-        # This prop may optionally be set for PV Claims and is used to locate DAGs
-        # on a SubPath
-        self.dags_volume_subpath = conf.get(
-            self.kubernetes_section, 'dags_volume_subpath')
-
-        # This prop may optionally be set for PV Claims and is used to locate logs
-        # on a SubPath
-        self.logs_volume_subpath = conf.get(
-            self.kubernetes_section, 'logs_volume_subpath')
-
-        # Optionally, hostPath volume containing DAGs
-        self.dags_volume_host = conf.get(self.kubernetes_section, 'dags_volume_host')
-
-        # Optionally, write logs to a hostPath Volume
-        self.logs_volume_host = conf.get(self.kubernetes_section, 'logs_volume_host')
-
-        # This prop may optionally be set for PV Claims and is used to write logs
-        self.base_log_folder = conf.get(self.logging_section, 'base_log_folder')
+        self.worker_container_repository = conf.get(
+            self.kubernetes_section, 'worker_container_repository')
+        self.worker_container_tag = conf.get(
+            self.kubernetes_section, 'worker_container_tag')
+        self.kube_image = '{}:{}'.format(
+            self.worker_container_repository, self.worker_container_tag)

Review comment:
       ```suggestion
           self.kube_image = f'{self.worker_container_repository}:{self.worker_container_tag}'
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r481273859



##########
File path: airflow/serialization/enums.py
##########
@@ -42,3 +42,4 @@ class DagAttributeTypes(str, Enum):
     DICT = 'dict'
     SET = 'set'
     TUPLE = 'tuple'
+    POD = 'pod'

Review comment:
       ```suggestion
       POD = 'k8s.V1Pod'
   ```
   
   Maybe?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r472917303



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -365,44 +365,57 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
                 }
             )
             self.labels.update(labels)
-        pod = pod_generator.PodGenerator(
-            image=self.image,
-            namespace=self.namespace,
-            cmds=self.cmds,
-            args=self.arguments,
-            labels=self.labels,
-            name=self.name,
-            envs=self.env_vars,
-            extract_xcom=self.do_xcom_push,
-            image_pull_policy=self.image_pull_policy,
-            node_selectors=self.node_selectors,
-            annotations=self.annotations,
-            affinity=self.affinity,
-            image_pull_secrets=self.image_pull_secrets,
-            service_account_name=self.service_account_name,
-            hostnetwork=self.hostnetwork,
-            tolerations=self.tolerations,
-            configmaps=self.configmaps,
-            security_context=self.security_context,
-            dnspolicy=self.dnspolicy,
-            schedulername=self.schedulername,
-            init_containers=self.init_containers,
-            restart_policy='Never',
-            priority_class_name=self.priority_class_name,
-            pod_template_file=self.pod_template_file,
-            pod=self.full_pod_spec,
-        ).gen_pod()
+        from kubernetes.client import models as k8s
+        pod = None

Review comment:
       ```suggestion
   
   ```
   Probably no need for that as we have if/else




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478322665



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       We do store executor_config in TaskInstance table:
   
   https://github.com/apache/airflow/blob/d7602654526fdd2876466371404784bd17cfe0d2/airflow/models/taskinstance.py#L222
   
   Although we use `dill` to store it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r484527163



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,59 +53,68 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"}
-            }
+        executor_config={"podOverride": k8s.V1Pod(

Review comment:
       After https://github.com/apache/airflow/pull/10756 is merged, we need to change `podOverride` -> `pod_override` in this PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r475959589



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -365,44 +365,57 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
                 }
             )
             self.labels.update(labels)
-        pod = pod_generator.PodGenerator(
-            image=self.image,
-            namespace=self.namespace,
-            cmds=self.cmds,
-            args=self.arguments,
-            labels=self.labels,
-            name=self.name,
-            envs=self.env_vars,
-            extract_xcom=self.do_xcom_push,
-            image_pull_policy=self.image_pull_policy,
-            node_selectors=self.node_selectors,
-            annotations=self.annotations,
-            affinity=self.affinity,
-            image_pull_secrets=self.image_pull_secrets,
-            service_account_name=self.service_account_name,
-            hostnetwork=self.hostnetwork,
-            tolerations=self.tolerations,
-            configmaps=self.configmaps,
-            security_context=self.security_context,
-            dnspolicy=self.dnspolicy,
-            schedulername=self.schedulername,
-            init_containers=self.init_containers,
-            restart_policy='Never',
-            priority_class_name=self.priority_class_name,
-            pod_template_file=self.pod_template_file,
-            pod=self.full_pod_spec,
-        ).gen_pod()
+        from kubernetes.client import models as k8s

Review comment:
       Nope removed that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-683929756


   On vacations now. But yes. I will be on a lookout :).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478507514



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       >We store the executor_config in the DB but TBH we don't really need to. Maybe we can just create new fields "kubernetes_executor_config" and "dag_serialization_config" and just remove the "executor_config" as a whole?
   
   If we don't need to store it in DB then sure we can remove it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-692088155


   @davlum interesting, though without the reconcile methods how would users use the pod_override setting, which is an arbitrary pod? We need some way to ensure features are either added or overwritten based on these three stages. It's also worth mentioning that we now have a `generate_dag_yaml` command in the CLI so users can see what their pods will look like before they launch (and we plan to add this preview to the UI)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r488203376



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -123,100 +124,18 @@ class PodGenerator:
     Any configuration that is container specific gets applied to
     the first container in the list of containers.
 
-    :param image: The docker image
-    :type image: Optional[str]
-    :param name: name in the metadata section (not the container name)
-    :type name: Optional[str]
-    :param namespace: pod namespace
-    :type namespace: Optional[str]
-    :param volume_mounts: list of kubernetes volumes mounts
-    :type volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]]
-    :param envs: A dict containing the environment variables
-    :type envs: Optional[Dict[str, str]]
-    :param cmds: The command to be run on the first container
-    :type cmds: Optional[List[str]]
-    :param args: The arguments to be run on the pod
-    :type args: Optional[List[str]]
-    :param labels: labels for the pod metadata
-    :type labels: Optional[Dict[str, str]]
-    :param node_selectors: node selectors for the pod
-    :type node_selectors: Optional[Dict[str, str]]
-    :param ports: list of ports. Applies to the first container.
-    :type ports: Optional[List[Union[k8s.V1ContainerPort, dict]]]
-    :param volumes: Volumes to be attached to the first container
-    :type volumes: Optional[List[Union[k8s.V1Volume, dict]]]
-    :param image_pull_policy: Specify a policy to cache or always pull an image
-    :type image_pull_policy: str
-    :param restart_policy: The restart policy of the pod
-    :type restart_policy: str
-    :param image_pull_secrets: Any image pull secrets to be given to the pod.
-        If more than one secret is required, provide a comma separated list:
-        secret_a,secret_b
-    :type image_pull_secrets: str
-    :param init_containers: A list of init containers
-    :type init_containers: Optional[List[k8s.V1Container]]
-    :param service_account_name: Identity for processes that run in a Pod
-    :type service_account_name: Optional[str]
-    :param resources: Resource requirements for the first containers
-    :type resources: Optional[Union[k8s.V1ResourceRequirements, dict]]
-    :param annotations: annotations for the pod
-    :type annotations: Optional[Dict[str, str]]
-    :param affinity: A dict containing a group of affinity scheduling rules
-    :type affinity: Optional[dict]
-    :param hostnetwork: If True enable host networking on the pod
-    :type hostnetwork: bool
-    :param tolerations: A list of kubernetes tolerations
-    :type tolerations: Optional[list]
-    :param security_context: A dict containing the security context for the pod
-    :type security_context: Optional[Union[k8s.V1PodSecurityContext, dict]]
-    :param configmaps: Any configmap refs to envfrom.
-        If more than one configmap is required, provide a comma separated list
-        configmap_a,configmap_b
-    :type configmaps: List[str]
-    :param dnspolicy: Specify a dnspolicy for the pod
-    :type dnspolicy: Optional[str]
-    :param schedulername: Specify a schedulername for the pod
-    :type schedulername: Optional[str]
     :param pod: The fully specified pod. Mutually exclusive with `path_or_string`
     :type pod: Optional[kubernetes.client.models.V1Pod]
     :param pod_template_file: Path to YAML file. Mutually exclusive with `pod`
     :type pod_template_file: Optional[str]
     :param extract_xcom: Whether to bring up a container for xcom
     :type extract_xcom: bool
-    :param priority_class_name: priority class name for the launched Pod
-    :type priority_class_name: str
     """
     def __init__(  # pylint: disable=too-many-arguments,too-many-locals
         self,
-        image: Optional[str] = None,
-        name: Optional[str] = None,
-        namespace: Optional[str] = None,
-        volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]] = None,
-        envs: Optional[Dict[str, str]] = None,
-        cmds: Optional[List[str]] = None,
-        args: Optional[List[str]] = None,
-        labels: Optional[Dict[str, str]] = None,
-        node_selectors: Optional[Dict[str, str]] = None,
-        ports: Optional[List[Union[k8s.V1ContainerPort, dict]]] = None,
-        volumes: Optional[List[Union[k8s.V1Volume, dict]]] = None,
-        image_pull_policy: Optional[str] = None,
-        restart_policy: Optional[str] = None,
-        image_pull_secrets: Optional[str] = None,
-        init_containers: Optional[List[k8s.V1Container]] = None,
-        service_account_name: Optional[str] = None,
-        resources: Optional[Union[k8s.V1ResourceRequirements, dict]] = None,
-        annotations: Optional[Dict[str, str]] = None,
-        affinity: Optional[dict] = None,
-        hostnetwork: bool = False,
-        tolerations: Optional[list] = None,
-        security_context: Optional[Union[k8s.V1PodSecurityContext, dict]] = None,
-        configmaps: Optional[List[str]] = None,
-        dnspolicy: Optional[str] = None,
-        schedulername: Optional[str] = None,
         pod: Optional[k8s.V1Pod] = None,
         pod_template_file: Optional[str] = None,
-        extract_xcom: bool = False,
-        priority_class_name: Optional[str] = None,
+        extract_xcom: bool = True
     ):
         self.validate_pod_generator_args(locals())

Review comment:
       @davlum good point, fixed!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r480121916



##########
File path: UPDATING.md
##########
@@ -153,6 +152,175 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In airflow 1.10, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,
+but will be removed in a future version.
+
+### Changes to the KubernetesPodOperator
+
+Much like the KubernetesExecutor, the KubernetesPodOperator will no longer take Airflow custom classes and will
+instead expect either a pod_template yaml file, or `kubernetes.client.models` objects.
+
+The one notable exception is that we will continue to support the `airflow.kubernetes.secret.Secret` class.
+
+Whereas previously a user would import each individual class to build the pod as so:
+
+```python
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.volume_mount import VolumeMount
+
+
+volume_config = {
+    'persistentVolumeClaim': {
+        'claimName': 'test-volume'
+    }
+}
+volume = Volume(name='test-volume', configs=volume_config)
+volume_mount = VolumeMount('test-volume',
+                           mount_path='/root/mount_file',
+                           sub_path=None,
+                           read_only=True)
+
+port = Port('http', 80)
+secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
+secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
+
+k = KubernetesPodOperator(
+    namespace='default',
+    image="ubuntu:16.04",
+    cmds=["bash", "-cx"],
+    arguments=["echo", "10"],
+    labels={"foo": "bar"},
+    secrets=[secret_file, secret_env],
+    ports=[port],
+    volumes=[volume],
+    volume_mounts=[volume_mount],
+    name="airflow-test-pod",
+    task_id="task",
+    affinity=affinity,
+    is_delete_operator_pod=True,
+    hostnetwork=False,
+    tolerations=tolerations,
+    configmaps=configmaps,
+    init_containers=[init_container],
+    priority_class_name="medium",
+)
+```
+Now the user can use the `kubernetes.client.models` class as a single point of entry for creating all k8s objects.
+
+```python
+from kubernetes.client import models as k8s
+from airflow.kubernetes.secret import Secret
+
+
+configmaps = ['test-configmap-1', 'test-configmap-2']
+
+volume = k8s.V1Volume(
+    name='test-volume',
+    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
+)
+
+port = k8s.V1ContainerPort(name='http', container_port=80)
+secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
+secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
+secret_all_keys = Secret('env', None, 'airflow-secrets-2')
+volume_mount = k8s.V1VolumeMount(
+    name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
+)
+
+k = KubernetesPodOperator(
+    namespace='default',
+    image="ubuntu:16.04",
+    cmds=["bash", "-cx"],
+    arguments=["echo", "10"],
+    labels={"foo": "bar"},
+    secrets=[secret_file, secret_env],
+    ports=[port],
+    volumes=[volume],
+    volume_mounts=[volume_mount],
+    name="airflow-test-pod",
+    task_id="task",
+    is_delete_operator_pod=True,
+    hostnetwork=False)
+```
+We decided to keep the Secret class as users seem to really like that simplifies the complexity of mounting
+Kubernetes secrets into workers.
+

Review comment:
       Can you add a migration guide for each configuration option?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r475960847



##########
File path: tests/kubernetes/test_pod_generator.py
##########
@@ -226,238 +186,169 @@ def test_gen_pod_extract_xcom(self, mock_uuid):
             ],
             'resources': {'requests': {'cpu': '1m'}},
         }
-        self.expected['spec']['containers'].append(container_two)
-        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
-            'name': 'xcom',
-            'mountPath': '/airflow/xcom'
-        })
-        self.expected['spec']['volumes'].insert(0, {
-            'name': 'xcom', 'emptyDir': {}
-        })
-        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(result_dict, self.expected)
+        self.expected.spec.containers.append(container_two)
+        base_container: k8s.V1Container = self.expected.spec.containers[0]
+        base_container.volume_mounts = base_container.volume_mounts or []
+        base_container.volume_mounts.append(k8s.V1VolumeMount(
+            name="xcom",
+            mount_path="/airflow/xcom"
+        ))
+        self.expected.spec.containers[0] = base_container
+        self.expected.spec.volumes = self.expected.spec.volumes or []
+        self.expected.spec.volumes.append(
+            k8s.V1Volume(
+                name='xcom',
+                empty_dir={},
+            )
+        )
+        result_dict = self.k8s_client.sanitize_for_serialization(result)
+        expected_dict = self.k8s_client.sanitize_for_serialization(self.expected)
 
-    def test_from_obj(self):
-        result = PodGenerator.from_obj({
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
-            }
-        })
-        result = self.k8s_client.sanitize_for_serialization(result)
+        self.assertEqual(result_dict, expected_dict)
 
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'annotations': {'test': 'annotation'},
-            },
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'ports': [],
-                    'volumeMounts': [{
-                        'mountPath': '/foo/',
-                        'name': 'example-kubernetes-test-volume'
-                    }],
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': [{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume'
-                }],
-            }
-        }, result)
+    def test_from_obj(self):

Review comment:
       @kaxil @ashb @potiuk @turbaszek one remaining question is what do we do about the executor_config? Do we force users to inject a k8s.V1Pod object for their executor config and just manually delete a few fields (like pod names)? Or do we continue to allow this dictionary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r480253840



##########
File path: UPDATING.md
##########
@@ -153,6 +152,175 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In airflow 1.10, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,
+but will be removed in a future version.
+
+### Changes to the KubernetesPodOperator

Review comment:
       @mik-laj Thank you for that feedback. I've added more detailed docs on the changed parameters. Can you please take a look when you have some time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-683927786


   Hey @dimberman - > should we wait with releasing backports before this one is merged ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477525125



##########
File path: telepresence.log
##########
@@ -0,0 +1,36 @@
+   0.0 TEL | Telepresence 0.105 launched at Mon Aug 24 20:51:33 2020

Review comment:
       Don't think you wanted to add this file :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r488149744



##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and

Review comment:
       ```suggestion
   The `airflow.cfg` will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
   ```

##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In airflow 1.10, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.

Review comment:
       ```suggestion
   In Airflow 1.10.x, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
   ```

##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In airflow 1.10, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,
+but will be removed in a future version.
+
+### Changes to the KubernetesPodOperator
+
+Much like the KubernetesExecutor, the KubernetesPodOperator will no longer take Airflow custom classes and will

Review comment:
       ```suggestion
   Much like the `KubernetesExecutor`, the `KubernetesPodOperator` will no longer take Airflow custom classes and will
   ```

##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks

Review comment:
       ```suggestion
   #### The `executor_config` Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-692154882


   @davlum I don't think we would want the pod_override to override all values. The examples I can think of off the top of my head are labels and volume_mounts. The base pod file will have volume mounts for the airflow.cfg and other potential secrets. I as a non-admin should be allowed to ADD volume mounts, but I shouldn't be able to delete them via override. Same with labels. Admin might have labels or node affinities they don't want messed with.
   
   While yes it's POSSIBLE to ensure those are there using the pod_mutation_hook, doing so places onus on the admin for the user's messups.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r490275702



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -396,7 +240,7 @@ def from_legacy_obj(obj) -> Optional[k8s.V1Pod]:
                     limits=limits
                 )
         namespaced['resources'] = resources
-        return PodGenerator(**namespaced).gen_pod()
+        return PodGeneratorDeprecated(**namespaced).gen_pod()

Review comment:
       Oh, you issue a warning a few lines up (from the previous PR. right?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman merged pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman merged pull request #10393:
URL: https://github.com/apache/airflow/pull/10393


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r472917303



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -365,44 +365,57 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
                 }
             )
             self.labels.update(labels)
-        pod = pod_generator.PodGenerator(
-            image=self.image,
-            namespace=self.namespace,
-            cmds=self.cmds,
-            args=self.arguments,
-            labels=self.labels,
-            name=self.name,
-            envs=self.env_vars,
-            extract_xcom=self.do_xcom_push,
-            image_pull_policy=self.image_pull_policy,
-            node_selectors=self.node_selectors,
-            annotations=self.annotations,
-            affinity=self.affinity,
-            image_pull_secrets=self.image_pull_secrets,
-            service_account_name=self.service_account_name,
-            hostnetwork=self.hostnetwork,
-            tolerations=self.tolerations,
-            configmaps=self.configmaps,
-            security_context=self.security_context,
-            dnspolicy=self.dnspolicy,
-            schedulername=self.schedulername,
-            init_containers=self.init_containers,
-            restart_policy='Never',
-            priority_class_name=self.priority_class_name,
-            pod_template_file=self.pod_template_file,
-            pod=self.full_pod_spec,
-        ).gen_pod()
+        from kubernetes.client import models as k8s
+        pod = None

Review comment:
       ```suggestion
   
   ```
   Probably noo need for that as we have if/else




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r483310169



##########
File path: tests/kubernetes/test_pod_generator.py
##########
@@ -241,238 +213,224 @@ def test_gen_pod_extract_xcom(self, mock_uuid):
             ],
             'resources': {'requests': {'cpu': '1m'}},
         }
-        self.expected['spec']['containers'].append(container_two)
-        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
-            'name': 'xcom',
-            'mountPath': '/airflow/xcom'
-        })
-        self.expected['spec']['volumes'].insert(0, {
-            'name': 'xcom', 'emptyDir': {}
-        })
-        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(result_dict, self.expected)
+        self.expected.spec.containers.append(container_two)
+        base_container: k8s.V1Container = self.expected.spec.containers[0]
+        base_container.volume_mounts = base_container.volume_mounts or []
+        base_container.volume_mounts.append(k8s.V1VolumeMount(
+            name="xcom",
+            mount_path="/airflow/xcom"
+        ))
+        self.expected.spec.containers[0] = base_container
+        self.expected.spec.volumes = self.expected.spec.volumes or []
+        self.expected.spec.volumes.append(
+            k8s.V1Volume(
+                name='xcom',
+                empty_dir={},
+            )
+        )
+        result_dict = self.k8s_client.sanitize_for_serialization(result)
+        expected_dict = self.k8s_client.sanitize_for_serialization(self.expected)
+
+        self.assertEqual(result_dict, expected_dict)
 
     def test_from_obj(self):
-        result = PodGenerator.from_obj({
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
+        result = PodGenerator.from_obj(
+            {
+                "KubernetesExecutor": k8s.V1Pod(
+                    api_version="v1",
+                    kind="Pod",
+                    metadata=k8s.V1ObjectMeta(
+                        name="foo",
+                        annotations={"test": "annotation"}
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(
+                                        mount_path="/foo/",
+                                        name="example-kubernetes-test-volume"
+                                    )
+                                ]
+                            )
+                        ],
+                        volumes=[
+                            k8s.V1Volume(
+                                name="example-kubernetes-test-volume",
+                                host_path=k8s.V1HostPathVolumeSource(
+                                    path="/tmp/"
+                                )
+                            )
+                        ]
+                    )
+                )
             }
-        })
+        )
         result = self.k8s_client.sanitize_for_serialization(result)
 
         self.assertEqual({
             'apiVersion': 'v1',
             'kind': 'Pod',
             'metadata': {
+                'name': 'foo',
                 'annotations': {'test': 'annotation'},
             },
             'spec': {
                 'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
                     'name': 'base',
-                    'ports': [],
                     'volumeMounts': [{
                         'mountPath': '/foo/',
                         'name': 'example-kubernetes-test-volume'
                     }],
                 }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
                 'volumes': [{
                     'hostPath': {'path': '/tmp/'},
                     'name': 'example-kubernetes-test-volume'
                 }],
             }
         }, result)
+        # TODO: Should we save this feature?

Review comment:
       ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-688398119


   @mik-laj fixed those tests, PTAL :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r489492947



##########
File path: docs/howto/operator/kubernetes.rst
##########
@@ -53,13 +53,10 @@ Ultimately, it allows Airflow to act a job orchestrator - no matter the language
 
 How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-Kubernetes cluster resources such as ConfigMaps, Secrets, and Volumes can be used with a Pod to be launched.
+Kubernetes cluster resources such as Secrets can be used with a Pod to be launched.
 Utilize the Airflow Kubernetes model classes such as:

Review comment:
       Fixed. @ashb yes `Volume` and `VolumeMount` would now be `k8s.V1Volume` and `k8s.V1VolumeMount`, I go into more detail on this in the UPDATING.md




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477529000



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       > The problem I'm seeing is that k8s.V1Pod objects aren't serializable into dictionaries. Not sure how to get around that and would rather not optimize for a future I don't think will come
   
   We will need it for DAG Serialization too, is it not possible to convert V1Pod to dict ? I thought each k8s object had some kind of method for that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478324985



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       `k8s.V1Pod(...).to_dict` should work for serialization, isn't it?
   
   https://github.com/kubernetes-client/python/blob/v11.0.0/kubernetes/client/models/v1_pod.py#L179-L201




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-683468309


   @potiuk @kaxil @ashb @turbaszek all tests passing and added documentation. PTAL!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-676427676


   Nice <3 -- some static checks are failing https://github.com/apache/airflow/pull/10393/checks?check_run_id=1003310026


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r486423899



##########
File path: kubernetes_tests/test_kubernetes_executor.yaml
##########
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+################################
+## Airflow Worker Deployment
+#################################
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: will-be-overwritten
+  namespace: airflow
+spec:
+  containers:
+    - name: base
+      env:
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: LocalExecutor
+        - name: AIRFLOW_HOME
+          value: /opt/airflow
+        - name: AIRFLOW__CORE__DAGS_FOLDER
+          value: /opt/airflow/dags
+        - name: AIRFLOW__CORE__FERNET_KEY
+          valueFrom:
+            secretKeyRef:
+              key: fernet-key
+              name: airflow-fernet-key
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              key: connection
+              name: airflow-airflow-metadata
+      resources: {}
+      terminationMessagePath: /dev/termination-log
+      terminationMessagePolicy: File
+      volumeMounts:
+        - mountPath: /opt/airflow/logs
+          name: airflow-logs
+        - mountPath: /opt/airflow/airflow.cfg
+          name: airflow-config
+          readOnly: true
+          subPath: airflow.cfg
+        - mountPath: /opt/airflow/config/airflow_local_settings.py
+          name: airflow-config
+          readOnly: true
+          subPath: airflow_local_settings.py
+  dnsPolicy: ClusterFirst
+  enableServiceLinks: true
+  imagePullSecrets:
+    - name: airflow-registry
+  restartPolicy: Never
+  securityContext:
+    fsGroup: 50000
+    runAsUser: 50000

Review comment:
       ```suggestion
       runAsUser: '{{ .Values.uid }}'
       fsGroup: '{{ .Values.gid }}' 
   ```
   
   etc.

##########
File path: chart/values.yaml
##########
@@ -541,32 +543,11 @@ config:
     namespace: '{{ .Release.Namespace }}'
     airflow_configmap: '{{ include "airflow_config" . }}'
     airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
+    pod_template_file: '{{ include "airflow_pod_template_file" . }}/{{ .Values.podTemplateFile }}'
     worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
     worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
     worker_container_image_pull_policy: '{{ .Values.images.airflow.pullPolicy }}'
-    worker_service_account_name: '{{ .Release.Name }}-worker-serviceaccount'
-    image_pull_secrets: '{{ template "registry_secret" . }}'
-    dags_in_image: '{{ ternary "False" "True" (or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled) }}'
     delete_worker_pods: 'True'
-    run_as_user: '{{ .Values.uid }}'
-    fs_group: '{{ .Values.gid }}'
-    git_dags_folder_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{end}}'
-    dags_volume_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{ end }}'
-    dags_volume_claim: '{{- if .Values.dags.persistence.enabled }}{{ include "airflow_dags_volume_claim" . }}{{ end }}'
-    dags_volume_subpath: '{{- if .Values.dags.persistence.enabled }}{{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }}{{ end }}'
-    git_repo: '{{- if and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled) }}{{ .Values.dags.gitSync.repo }}{{ end }}'
-    git_branch: '{{ .Values.dags.gitSync.branch }}'
-    git_sync_rev: '{{ .Values.dags.gitSync.rev }}'
-    git_sync_depth: '{{ .Values.dags.gitSync.depth }}'
-    git_sync_root: '{{ .Values.dags.gitSync.root }}'
-    git_sync_dest: '{{ .Values.dags.gitSync.dest }}'
-    git_sync_container_repository: '{{ .Values.dags.gitSync.containerRepository }}'
-    git_sync_container_tag: '{{ .Values.dags.gitSync.containerTag }}'
-    git_sync_init_container_name: '{{ .Values.dags.gitSync.containerName }}'
-    git_sync_run_as_user: '{{ .Values.uid }}'
-    git_ssh_known_hosts_configmap_name: '{{- if .Values.dags.gitSync.knownHosts }}{{ include "airflow_config" . }}{{ end }}'
-    git_ssh_key_secret_name: '{{- if .Values.dags.gitSync.sshKeySecret }}{{ .Values.dags.gitSync.sshKeySecret }}{{ end }}'
-    git_sync_credentials_secret: '{{- if .Values.dags.gitSync.credentialsSecret }}{{ .Values.dags.gitSync.credentialsSecret }}{{ end }}'

Review comment:
       Didn't we discuss that the chart would take these helm values and generate the kube template file based on these?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478076339



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       ```  
   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/json.py", line 54, in _default
       raise TypeError(f"Object of type '{obj.__class__.__name__}' is not JSON serializable")
   TypeError: Object of type 'V1Pod' is not JSON serializable
   ```
   
   Yeah so if I try to make the pod a field I end up getting this error when I try to render on the page. Not sure what the best way forward is there. Maybe add a field? I don't think we need to change the DB as the executor_config isn't stored in the DB anyways.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r472915639



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -96,100 +96,18 @@ class PodGenerator:
     Any configuration that is container specific gets applied to
     the first container in the list of containers.
 
-    :param image: The docker image
-    :type image: Optional[str]
-    :param name: name in the metadata section (not the container name)
-    :type name: Optional[str]
-    :param namespace: pod namespace
-    :type namespace: Optional[str]
-    :param volume_mounts: list of kubernetes volumes mounts
-    :type volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]]
-    :param envs: A dict containing the environment variables
-    :type envs: Optional[Dict[str, str]]
-    :param cmds: The command to be run on the first container
-    :type cmds: Optional[List[str]]
-    :param args: The arguments to be run on the pod
-    :type args: Optional[List[str]]
-    :param labels: labels for the pod metadata
-    :type labels: Optional[Dict[str, str]]
-    :param node_selectors: node selectors for the pod
-    :type node_selectors: Optional[Dict[str, str]]
-    :param ports: list of ports. Applies to the first container.
-    :type ports: Optional[List[Union[k8s.V1ContainerPort, dict]]]
-    :param volumes: Volumes to be attached to the first container
-    :type volumes: Optional[List[Union[k8s.V1Volume, dict]]]
-    :param image_pull_policy: Specify a policy to cache or always pull an image
-    :type image_pull_policy: str
-    :param restart_policy: The restart policy of the pod
-    :type restart_policy: str
-    :param image_pull_secrets: Any image pull secrets to be given to the pod.
-        If more than one secret is required, provide a comma separated list:
-        secret_a,secret_b
-    :type image_pull_secrets: str
-    :param init_containers: A list of init containers
-    :type init_containers: Optional[List[k8s.V1Container]]
-    :param service_account_name: Identity for processes that run in a Pod
-    :type service_account_name: Optional[str]
-    :param resources: Resource requirements for the first containers
-    :type resources: Optional[Union[k8s.V1ResourceRequirements, dict]]
-    :param annotations: annotations for the pod
-    :type annotations: Optional[Dict[str, str]]
-    :param affinity: A dict containing a group of affinity scheduling rules
-    :type affinity: Optional[dict]
-    :param hostnetwork: If True enable host networking on the pod
-    :type hostnetwork: bool
-    :param tolerations: A list of kubernetes tolerations
-    :type tolerations: Optional[list]
-    :param security_context: A dict containing the security context for the pod
-    :type security_context: Optional[Union[k8s.V1PodSecurityContext, dict]]
-    :param configmaps: Any configmap refs to envfrom.
-        If more than one configmap is required, provide a comma separated list
-        configmap_a,configmap_b
-    :type configmaps: List[str]
-    :param dnspolicy: Specify a dnspolicy for the pod
-    :type dnspolicy: Optional[str]
-    :param schedulername: Specify a schedulername for the pod
-    :type schedulername: Optional[str]
     :param pod: The fully specified pod. Mutually exclusive with `path_or_string`
     :type pod: Optional[kubernetes.client.models.V1Pod]
     :param pod_template_file: Path to YAML file. Mutually exclusive with `pod`
     :type pod_template_file: Optional[str]
     :param extract_xcom: Whether to bring up a container for xcom
     :type extract_xcom: bool
-    :param priority_class_name: priority class name for the launched Pod
-    :type priority_class_name: str
     """
     def __init__(  # pylint: disable=too-many-arguments,too-many-locals
         self,
-        image: Optional[str] = None,
-        name: Optional[str] = None,
-        namespace: Optional[str] = None,
-        volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]] = None,
-        envs: Optional[Dict[str, str]] = None,
-        cmds: Optional[List[str]] = None,
-        args: Optional[List[str]] = None,
-        labels: Optional[Dict[str, str]] = None,
-        node_selectors: Optional[Dict[str, str]] = None,
-        ports: Optional[List[Union[k8s.V1ContainerPort, dict]]] = None,
-        volumes: Optional[List[Union[k8s.V1Volume, dict]]] = None,
-        image_pull_policy: Optional[str] = None,
-        restart_policy: Optional[str] = None,
-        image_pull_secrets: Optional[str] = None,
-        init_containers: Optional[List[k8s.V1Container]] = None,
-        service_account_name: Optional[str] = None,
-        resources: Optional[Union[k8s.V1ResourceRequirements, dict]] = None,
-        annotations: Optional[Dict[str, str]] = None,
-        affinity: Optional[dict] = None,
-        hostnetwork: bool = False,
-        tolerations: Optional[list] = None,
-        security_context: Optional[Union[k8s.V1PodSecurityContext, dict]] = None,
-        configmaps: Optional[List[str]] = None,
-        dnspolicy: Optional[str] = None,
-        schedulername: Optional[str] = None,
         pod: Optional[k8s.V1Pod] = None,
         pod_template_file: Optional[str] = None,
-        extract_xcom: bool = False,
-        priority_class_name: Optional[str] = None,
+        extract_xcom=True

Review comment:
       ```suggestion
           extract_xcom: bool =True
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman closed pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman closed pull request #10393:
URL: https://github.com/apache/airflow/pull/10393


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r480253840



##########
File path: UPDATING.md
##########
@@ -153,6 +152,175 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
+
+In Airflow 2.0, the KubernetesExecutor will require a base pod template written in yaml. This file can exist
+anywhere on the host machine and will be linked using the `pod_template_file` configuration in the airflow.cfg.
+
+The airflow.cfg will still accept values for the `worker_container_repository`, the `worker_container_tag`, and
+the default namespace.
+
+#### The executor_config Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks
+
+In airflow 1.10, users could modify task pods at runtime by passing a dictionary to the `executor_config` variable.
+Users will now have full access the Kubernetes API via the `kubernetes.client.models.V1Pod`.
+
+While in the deprecated version a user would mount a volume using the following dictionary:
+
+```python
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "example-kubernetes-test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "example-kubernetes-test-volume",
+                },
+            ]
+        }
+    }
+)
+```
+
+In the new model a user can accomplish the same thing using the following code:
+
+```python
+from kubernetes.client import models as k8s
+
+second_task = PythonOperator(
+    task_id="four_task",
+    python_callable=test_volume_mount,
+    executor_config={"KubernetesExecutor": k8s.V1Pod(
+        spec=k8s.V1PodSpec(
+            containers=[
+                k8s.V1Container(
+                    name="base",
+                    volume_mounts=[
+                        k8s.V1VolumeMount(
+                            mount_path="/foo/",
+                            name="example-kubernetes-test-volume"
+                        )
+                    ]
+                )
+            ],
+            volumes=[
+                k8s.V1Volume(
+                    name="example-kubernetes-test-volume",
+                    host_path=k8s.V1HostPathVolumeSource(
+                        path="/tmp/"
+                    )
+                )
+            ]
+        )
+    )
+    }
+)
+```
+For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,
+but will be removed in a future version.
+
+### Changes to the KubernetesPodOperator

Review comment:
       @mik-laj Thank you for that feedback. I've added more detailed feedback on the changed parameters. Can you please take a look when you have some time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r490274765



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -396,7 +240,7 @@ def from_legacy_obj(obj) -> Optional[k8s.V1Pod]:
                     limits=limits
                 )
         namespaced['resources'] = resources
-        return PodGenerator(**namespaced).gen_pod()
+        return PodGeneratorDeprecated(**namespaced).gen_pod()

Review comment:
       Can/should we issue a FutureDeprecationWarning here, so that we can remove this code path in 2.1/2.2?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478597468



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       Ok I got it working with the traditional executor_config. Thanks @kaxil!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r486513263



##########
File path: chart/values.yaml
##########
@@ -541,32 +543,11 @@ config:
     namespace: '{{ .Release.Namespace }}'
     airflow_configmap: '{{ include "airflow_config" . }}'
     airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
+    pod_template_file: '{{ include "airflow_pod_template_file" . }}/{{ .Values.podTemplateFile }}'
     worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
     worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
     worker_container_image_pull_policy: '{{ .Values.images.airflow.pullPolicy }}'
-    worker_service_account_name: '{{ .Release.Name }}-worker-serviceaccount'
-    image_pull_secrets: '{{ template "registry_secret" . }}'
-    dags_in_image: '{{ ternary "False" "True" (or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled) }}'
     delete_worker_pods: 'True'
-    run_as_user: '{{ .Values.uid }}'
-    fs_group: '{{ .Values.gid }}'
-    git_dags_folder_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{end}}'
-    dags_volume_mount_point: '{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}{{ include "airflow_dags_mount_path" . }}{{ end }}'
-    dags_volume_claim: '{{- if .Values.dags.persistence.enabled }}{{ include "airflow_dags_volume_claim" . }}{{ end }}'
-    dags_volume_subpath: '{{- if .Values.dags.persistence.enabled }}{{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }}{{ end }}'
-    git_repo: '{{- if and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled) }}{{ .Values.dags.gitSync.repo }}{{ end }}'
-    git_branch: '{{ .Values.dags.gitSync.branch }}'
-    git_sync_rev: '{{ .Values.dags.gitSync.rev }}'
-    git_sync_depth: '{{ .Values.dags.gitSync.depth }}'
-    git_sync_root: '{{ .Values.dags.gitSync.root }}'
-    git_sync_dest: '{{ .Values.dags.gitSync.dest }}'
-    git_sync_container_repository: '{{ .Values.dags.gitSync.containerRepository }}'
-    git_sync_container_tag: '{{ .Values.dags.gitSync.containerTag }}'
-    git_sync_init_container_name: '{{ .Values.dags.gitSync.containerName }}'
-    git_sync_run_as_user: '{{ .Values.uid }}'
-    git_ssh_known_hosts_configmap_name: '{{- if .Values.dags.gitSync.knownHosts }}{{ include "airflow_config" . }}{{ end }}'
-    git_ssh_key_secret_name: '{{- if .Values.dags.gitSync.sshKeySecret }}{{ .Values.dags.gitSync.sshKeySecret }}{{ end }}'
-    git_sync_credentials_secret: '{{- if .Values.dags.gitSync.credentialsSecret }}{{ .Values.dags.gitSync.credentialsSecret }}{{ end }}'

Review comment:
       @ashb every value in this section is brought from other parts of the helm file. Should we keep this around or delete this section and still have a default pod_template file from those other values (seeing as this section is specifically made for the airflow.cfg)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477527689



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       (at this point the executor_config has existed for years and no other executor has used it. If it wasn't such a pain I'd push for us to change the name to something like "kubernetes_modifier_pod")




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r489370753



##########
File path: UPDATING.md
##########
@@ -154,6 +153,480 @@ The Old and New provider configuration keys that have changed are as follows
 
 For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
 
+### Changes to the KubernetesExecutor
+
+#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations

Review comment:
       Can you add a full list of configuration options that have been deleted? This is important if we are going to work on `airflow upgrade-check`, because having the list we can detect incorrect configuration and notify the user at runtime.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r483310271



##########
File path: tests/kubernetes/test_pod_generator.py
##########
@@ -241,238 +213,224 @@ def test_gen_pod_extract_xcom(self, mock_uuid):
             ],
             'resources': {'requests': {'cpu': '1m'}},
         }
-        self.expected['spec']['containers'].append(container_two)
-        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
-            'name': 'xcom',
-            'mountPath': '/airflow/xcom'
-        })
-        self.expected['spec']['volumes'].insert(0, {
-            'name': 'xcom', 'emptyDir': {}
-        })
-        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(result_dict, self.expected)
+        self.expected.spec.containers.append(container_two)
+        base_container: k8s.V1Container = self.expected.spec.containers[0]
+        base_container.volume_mounts = base_container.volume_mounts or []
+        base_container.volume_mounts.append(k8s.V1VolumeMount(
+            name="xcom",
+            mount_path="/airflow/xcom"
+        ))
+        self.expected.spec.containers[0] = base_container
+        self.expected.spec.volumes = self.expected.spec.volumes or []
+        self.expected.spec.volumes.append(
+            k8s.V1Volume(
+                name='xcom',
+                empty_dir={},
+            )
+        )
+        result_dict = self.k8s_client.sanitize_for_serialization(result)
+        expected_dict = self.k8s_client.sanitize_for_serialization(self.expected)
+
+        self.assertEqual(result_dict, expected_dict)
 
     def test_from_obj(self):
-        result = PodGenerator.from_obj({
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
+        result = PodGenerator.from_obj(
+            {
+                "KubernetesExecutor": k8s.V1Pod(
+                    api_version="v1",
+                    kind="Pod",
+                    metadata=k8s.V1ObjectMeta(
+                        name="foo",
+                        annotations={"test": "annotation"}
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(
+                                        mount_path="/foo/",
+                                        name="example-kubernetes-test-volume"
+                                    )
+                                ]
+                            )
+                        ],
+                        volumes=[
+                            k8s.V1Volume(
+                                name="example-kubernetes-test-volume",
+                                host_path=k8s.V1HostPathVolumeSource(
+                                    path="/tmp/"
+                                )
+                            )
+                        ]
+                    )
+                )
             }
-        })
+        )
         result = self.k8s_client.sanitize_for_serialization(result)
 
         self.assertEqual({
             'apiVersion': 'v1',
             'kind': 'Pod',
             'metadata': {
+                'name': 'foo',
                 'annotations': {'test': 'annotation'},
             },
             'spec': {
                 'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
                     'name': 'base',
-                    'ports': [],
                     'volumeMounts': [{
                         'mountPath': '/foo/',
                         'name': 'example-kubernetes-test-volume'
                     }],
                 }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
                 'volumes': [{
                     'hostPath': {'path': '/tmp/'},
                     'name': 'example-kubernetes-test-volume'
                 }],
             }
         }, result)
+        # TODO: Should we save this feature?
+        # result = PodGenerator.from_obj({
+        #     "KubernetesExecutor": {
+        #         "annotations": {"test": "annotation"},
+        #         "volumes": [
+        #             {
+        #                 "name": "example-kubernetes-test-volume",
+        #                 "hostPath": {"path": "/tmp/"},
+        #             },
+        #         ],
+        #         "volume_mounts": [
+        #             {
+        #                 "mountPath": "/foo/",
+        #                 "name": "example-kubernetes-test-volume",
+        #             },
+        #         ],
+        #     }
+        # })
+        # result = self.k8s_client.sanitize_for_serialization(result)
+        #
+        # self.assertEqual({
+        #     'apiVersion': 'v1',
+        #     'kind': 'Pod',
+        #     'metadata': {
+        #         'annotations': {'test': 'annotation'},
+        #     },
+        #     'spec': {
+        #         'containers': [{
+        #             'args': [],
+        #             'command': [],
+        #             'env': [],
+        #             'envFrom': [],
+        #             'name': 'base',
+        #             'ports': [],
+        #             'volumeMounts': [{
+        #                 'mountPath': '/foo/',
+        #                 'name': 'example-kubernetes-test-volume'
+        #             }],
+        #         }],
+        #         'hostNetwork': False,
+        #         'imagePullSecrets': [],
+        #         'volumes': [{
+        #             'hostPath': {'path': '/tmp/'},
+        #             'name': 'example-kubernetes-test-volume'
+        #         }],
+        #     }
+        # }, result)
 
     @mock.patch('uuid.uuid4')
     def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
-        mock_uuid.return_value = self.static_uuid
-        base_pod = PodGenerator(
-            image='image1',
-            name='name1',
-            envs={'key1': 'val1'},
-            cmds=['/bin/command1.sh', 'arg1'],
-            ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
-            volumes=[{
-                'hostPath': {'path': '/tmp/'},
-                'name': 'example-kubernetes-test-volume1'
-            }],
-            volume_mounts=[{
-                'mountPath': '/foo/',
-                'name': 'example-kubernetes-test-volume1'
-            }],
-        ).gen_pod()
-
-        mutator_pod = None
-        name = 'name1-' + self.static_uuid.hex
-
-        base_pod.metadata.name = name
-
-        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
-        self.assertEqual(base_pod, result)
-
-        mutator_pod = k8s.V1Pod()
-        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
-        self.assertEqual(base_pod, result)
+        pass
+        # mock_uuid.return_value = self.static_uuid

Review comment:
       ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] davlum commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
davlum commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r487934539



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -123,100 +124,18 @@ class PodGenerator:
     Any configuration that is container specific gets applied to
     the first container in the list of containers.
 
-    :param image: The docker image
-    :type image: Optional[str]
-    :param name: name in the metadata section (not the container name)
-    :type name: Optional[str]
-    :param namespace: pod namespace
-    :type namespace: Optional[str]
-    :param volume_mounts: list of kubernetes volumes mounts
-    :type volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]]
-    :param envs: A dict containing the environment variables
-    :type envs: Optional[Dict[str, str]]
-    :param cmds: The command to be run on the first container
-    :type cmds: Optional[List[str]]
-    :param args: The arguments to be run on the pod
-    :type args: Optional[List[str]]
-    :param labels: labels for the pod metadata
-    :type labels: Optional[Dict[str, str]]
-    :param node_selectors: node selectors for the pod
-    :type node_selectors: Optional[Dict[str, str]]
-    :param ports: list of ports. Applies to the first container.
-    :type ports: Optional[List[Union[k8s.V1ContainerPort, dict]]]
-    :param volumes: Volumes to be attached to the first container
-    :type volumes: Optional[List[Union[k8s.V1Volume, dict]]]
-    :param image_pull_policy: Specify a policy to cache or always pull an image
-    :type image_pull_policy: str
-    :param restart_policy: The restart policy of the pod
-    :type restart_policy: str
-    :param image_pull_secrets: Any image pull secrets to be given to the pod.
-        If more than one secret is required, provide a comma separated list:
-        secret_a,secret_b
-    :type image_pull_secrets: str
-    :param init_containers: A list of init containers
-    :type init_containers: Optional[List[k8s.V1Container]]
-    :param service_account_name: Identity for processes that run in a Pod
-    :type service_account_name: Optional[str]
-    :param resources: Resource requirements for the first containers
-    :type resources: Optional[Union[k8s.V1ResourceRequirements, dict]]
-    :param annotations: annotations for the pod
-    :type annotations: Optional[Dict[str, str]]
-    :param affinity: A dict containing a group of affinity scheduling rules
-    :type affinity: Optional[dict]
-    :param hostnetwork: If True enable host networking on the pod
-    :type hostnetwork: bool
-    :param tolerations: A list of kubernetes tolerations
-    :type tolerations: Optional[list]
-    :param security_context: A dict containing the security context for the pod
-    :type security_context: Optional[Union[k8s.V1PodSecurityContext, dict]]
-    :param configmaps: Any configmap refs to envfrom.
-        If more than one configmap is required, provide a comma separated list
-        configmap_a,configmap_b
-    :type configmaps: List[str]
-    :param dnspolicy: Specify a dnspolicy for the pod
-    :type dnspolicy: Optional[str]
-    :param schedulername: Specify a schedulername for the pod
-    :type schedulername: Optional[str]
     :param pod: The fully specified pod. Mutually exclusive with `path_or_string`
     :type pod: Optional[kubernetes.client.models.V1Pod]
     :param pod_template_file: Path to YAML file. Mutually exclusive with `pod`
     :type pod_template_file: Optional[str]
     :param extract_xcom: Whether to bring up a container for xcom
     :type extract_xcom: bool
-    :param priority_class_name: priority class name for the launched Pod
-    :type priority_class_name: str
     """
     def __init__(  # pylint: disable=too-many-arguments,too-many-locals
         self,
-        image: Optional[str] = None,
-        name: Optional[str] = None,
-        namespace: Optional[str] = None,
-        volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]] = None,
-        envs: Optional[Dict[str, str]] = None,
-        cmds: Optional[List[str]] = None,
-        args: Optional[List[str]] = None,
-        labels: Optional[Dict[str, str]] = None,
-        node_selectors: Optional[Dict[str, str]] = None,
-        ports: Optional[List[Union[k8s.V1ContainerPort, dict]]] = None,
-        volumes: Optional[List[Union[k8s.V1Volume, dict]]] = None,
-        image_pull_policy: Optional[str] = None,
-        restart_policy: Optional[str] = None,
-        image_pull_secrets: Optional[str] = None,
-        init_containers: Optional[List[k8s.V1Container]] = None,
-        service_account_name: Optional[str] = None,
-        resources: Optional[Union[k8s.V1ResourceRequirements, dict]] = None,
-        annotations: Optional[Dict[str, str]] = None,
-        affinity: Optional[dict] = None,
-        hostnetwork: bool = False,
-        tolerations: Optional[list] = None,
-        security_context: Optional[Union[k8s.V1PodSecurityContext, dict]] = None,
-        configmaps: Optional[List[str]] = None,
-        dnspolicy: Optional[str] = None,
-        schedulername: Optional[str] = None,
         pod: Optional[k8s.V1Pod] = None,
         pod_template_file: Optional[str] = None,
-        extract_xcom: bool = False,
-        priority_class_name: Optional[str] = None,
+        extract_xcom: bool = True
     ):
         self.validate_pod_generator_args(locals())

Review comment:
       `validate_pod_generator_args` can be significantly simplified. The only reason it was so complex was because of the signature inspection it was doing for the ridiculous number of arguments being passed. Now the validation is quite simple, the client either passes `pod` or `pod_template_file`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r479813400



##########
File path: airflow/kubernetes/pod_generator_deprecated.py
##########
@@ -0,0 +1,620 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       @ashb @potiuk @kaxil @turbaszek With this class we can actually maintain the existing executor_config with a deprecation warning until 2.1 or 2.2. This should help ease the transition. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r490275604



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -396,7 +240,7 @@ def from_legacy_obj(obj) -> Optional[k8s.V1Pod]:
                     limits=limits
                 )
         namespaced['resources'] = resources
-        return PodGenerator(**namespaced).gen_pod()
+        return PodGeneratorDeprecated(**namespaced).gen_pod()

Review comment:
       @ashb there is a future deprecation https://github.com/apache/airflow/pull/10393/files#diff-868ed785b2a336f20cb6a577dde2502aR198-R201




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477516328



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       I think having the executor name would be helpful if in future we want to see configs for multiple executors




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#issuecomment-676255041


   > -2.5k of lines - love it  This is still WIP but we should remember about note in UPDATING.md
   
   Indeed. I love the stats on this one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r477525529



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       The problem I'm seeing is that k8s.V1Pod objects aren't serializable into dictionaries. Not sure how to get around that and would rather not optimize for a future I don't think will come




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #10393: Simplify the K8sExecutor and K8sPodOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #10393:
URL: https://github.com/apache/airflow/pull/10393#discussion_r478094698



##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -52,60 +53,67 @@ def test_volume_mount():
     start_task = PythonOperator(
         task_id="start_task",
         python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {

Review comment:
       (though the downside of adding a field is that it would create even more confusing in transitioning DAGs. I'd personally either figure out a solution to this json serialization issue, or just have multiple types for the executor_config)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org