You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/17 10:08:43 UTC

[GitHub] [airflow] hamedhsn commented on a diff in pull request #22253: Add SparkKubernetesOperator crd implementation

hamedhsn commented on code in PR #22253:
URL: https://github.com/apache/airflow/pull/22253#discussion_r992084488


##########
airflow/kubernetes/custom_object_launcher.py:
##########
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Launches Custom object"""
+import sys
+import time
+from copy import deepcopy
+from datetime import datetime as dt
+from typing import Optional
+
+import tenacity
+import yaml
+from kubernetes import client, watch
+from kubernetes.client import models as k8s
+from kubernetes.client.rest import ApiException
+
+from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+def should_retry_start_spark_job(exception: BaseException) -> bool:
+    """Check if an Exception indicates a transient error and warrants retrying"""
+    if isinstance(exception, ApiException):
+        return exception.status == 409
+    return False
+
+
+class SparkResources:
+    """spark resources
+    :param request_memory: requested memory
+    :param request_cpu: requested CPU number
+    :param request_ephemeral_storage: requested ephemeral storage
+    :param limit_memory: limit for memory usage
+    :param limit_cpu: Limit for CPU used
+    :param limit_gpu: Limits for GPU used
+    :param limit_ephemeral_storage: Limit for ephemeral storage
+    """
+
+    def __init__(
+        self,
+        **kwargs,
+    ):
+        self.driver_request_cpu = kwargs.get('driver_request_cpu')
+        self.driver_limit_cpu = kwargs.get('driver_limit_cpu')
+        self.driver_limit_memory = kwargs.get('driver_limit_memory')
+        self.executor_request_cpu = kwargs.get('executor_request_cpu')
+        self.executor_limit_cpu = kwargs.get('executor_limit_cpu')
+        self.executor_limit_memory = kwargs.get('executor_limit_memory')
+        self.driver_gpu_name = kwargs.get('driver_gpu_name')
+        self.driver_gpu_quantity = kwargs.get('driver_gpu_quantity')
+        self.executor_gpu_name = kwargs.get('executor_gpu_name')
+        self.executor_gpu_quantity = kwargs.get('executor_gpu_quantity')

Review Comment:
   no particular reason, it is used internally and it makes it much more concise and less boilerplate. 



##########
airflow/kubernetes/custom_object_launcher.py:
##########
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Launches Custom object"""
+import sys
+import time
+from copy import deepcopy
+from datetime import datetime as dt
+from typing import Optional
+
+import tenacity
+import yaml
+from kubernetes import client, watch
+from kubernetes.client import models as k8s
+from kubernetes.client.rest import ApiException
+
+from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+def should_retry_start_spark_job(exception: BaseException) -> bool:
+    """Check if an Exception indicates a transient error and warrants retrying"""
+    if isinstance(exception, ApiException):
+        return exception.status == 409
+    return False
+
+
+class SparkResources:

Review Comment:
   Am not aware of such a library. let me know if you know anything that could be reused. 
   



##########
airflow/kubernetes/pod_launcher_deprecated.py:
##########
@@ -239,7 +239,6 @@ def read_pod_logs(
             return self._client.read_namespaced_pod_log(
                 name=pod.metadata.name,
                 namespace=pod.metadata.namespace,
-                container='base',

Review Comment:
   true. this is from the old code. removed



##########
airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:
##########
@@ -138,6 +139,39 @@ def convert_configmap(configmaps) -> k8s.V1EnvFromSource:
     return k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmaps))
 
 
+def convert_secret(secret) -> k8s.V1EnvFromSource:

Review Comment:
   sure created a new module and defined them there. 
   they are defined under `resource_convert` folder



##########
airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -32,47 +58,411 @@ class SparkKubernetesOperator(BaseOperator):
 
     .. seealso::
         For more detail about Spark Application Object have a look at the reference:
-        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
+        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication
 
-    :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
-        path to a '.yaml' file, '.json' file, YAML string or JSON string.
+    :param application_file: filepath to kubernetes custom_resource_definition of sparkApplication
+    :param kubernetes_conn_id: the connection to Kubernetes cluster
+    :param image: Docker image you wish to launch. Defaults to hub.docker.com,
+    :param code_path: path to the code in your image,
     :param namespace: kubernetes namespace to put sparkApplication
-    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
-        for the to Kubernetes cluster.
-    :param api_group: kubernetes api group of sparkApplication
-    :param api_version: kubernetes api version of sparkApplication
+    :param api_group: CRD api group for spark
+            https://github.com/GoogleCloudPlatform/spark-on-k8s-operator#project-status
+    :param api_version: CRD api version

Review Comment:
   I changed the names and it looks better



##########
airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -32,47 +58,411 @@ class SparkKubernetesOperator(BaseOperator):
 
     .. seealso::
         For more detail about Spark Application Object have a look at the reference:
-        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
+        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication
 
-    :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
-        path to a '.yaml' file, '.json' file, YAML string or JSON string.
+    :param application_file: filepath to kubernetes custom_resource_definition of sparkApplication
+    :param kubernetes_conn_id: the connection to Kubernetes cluster
+    :param image: Docker image you wish to launch. Defaults to hub.docker.com,
+    :param code_path: path to the code in your image,
     :param namespace: kubernetes namespace to put sparkApplication
-    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
-        for the to Kubernetes cluster.
-    :param api_group: kubernetes api group of sparkApplication
-    :param api_version: kubernetes api version of sparkApplication
+    :param api_group: CRD api group for spark
+            https://github.com/GoogleCloudPlatform/spark-on-k8s-operator#project-status
+    :param api_version: CRD api version
+    :param api_kind: CRD api kind
+    :param api_plural: CRD api plural
+    :param cluster_context: context of the cluster
+    :param labels: labels to apply to the crd.
+    :param config_file: kube configuration file
+    :param resources: resources for the launched pod.
+    :param number_workers: number spark executors
+    :param env_vars: A dictionary of key:value OR list of V1EnvVar items
+    :param env_from: A list of V1EnvFromSource items
+    :param affinity: Affinity scheduling rules for the launched pod.(V1Affinity)
+    :param tolerations: A list of kubernetes tolerations.(V1Toleration)
+    :param volume_mounts: A list of V1VolumeMount items
+    :param volumes: A list of V1Volume items
+    :param config_map_mounts: A dictionary of config_map as key and path as value
+    :param from_env_config_map: Read configmap into a env variable(name of the configmap)
+    :param from_env_secret: Read secret into a env variable(name of the configmap)
+    :param hadoop_config: hadoop base config e.g, AWS s3 config
+    :param application_file: yaml file if passed
+    :param image_pull_secrets: Any image pull secrets to be given to the pod.
+        If more than one secret is required, provide a
+        comma separated list: secret_a,secret_b
+    :param get_logs: get the stdout of the container as logs of the tasks.
+    :param do_xcom_push: If True, the content of the file
+        /airflow/xcom/return.json in the container will also be pushed to an
+        XCom when the container completes.
+    :param restart_policy: restart policy of the driver/executor
+    :param spark_version: spark version
+    :param success_run_history_limit: Number of past successful runs of the application to keep.
+    :param delete_on_termination: What to do when the pod reaches its final
+        state, or the execution is interrupted. If True (default), delete the
+        pod; if False, leave the pod.
+    :param dynamic_allocation: Enable spark dynamic allocation
+    :param dynamic_alloc_max_executors: Max number of executor if dynamic_allocation is enabled
+    :param dynamic_alloc_initial_executors: Initial number of executor if dynamic_allocation is enabled
+    :param dynamic_alloc_min_executors: min number of executor if dynamic_allocation is enabled
+    :param image_pull_policy: Specify a policy to cache or always pull an image.
+    :param service_account_name: Name of the service account
+    :param spark_job_mode: spark job type in spark operator(at the time of writing it just supports cluster)
+    :param spark_job_python_version: version of spark python
+    :param spark_job_type: type of spark job
+    :param startup_timeout_seconds: timeout in seconds to startup the pod.
+    :param log_events_on_failure: Log the pod's events if a failure occurs
+    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor
     """
 
-    template_fields: Sequence[str] = ('application_file', 'namespace')
-    template_ext: Sequence[str] = ('.yaml', '.yml', '.json')
+    template_fields = ['application_file', 'namespace']
+    template_ext = ('yaml', 'yml', 'json')
     ui_color = '#f4a460'
 
     def __init__(
         self,
         *,
-        application_file: str,
-        namespace: str | None = None,
-        kubernetes_conn_id: str = 'kubernetes_default',
+        image: Optional[str] = None,
+        code_path: Optional[str] = None,
+        namespace: Optional[str] = 'default',
         api_group: str = 'sparkoperator.k8s.io',
         api_version: str = 'v1beta2',
+        api_kind: str = 'SparkApplication',
+        api_plural: str = 'sparkapplications',
+        cluster_context: Optional[str] = None,
+        config_file: Optional[str] = None,
+        labels: Optional[dict] = None,
+        resources: Optional[dict] = None,
+        number_workers: int = 1,
+        env_vars: Optional[Union[List[k8s.V1EnvVar], Dict]] = None,
+        env_from: Optional[List[k8s.V1EnvFromSource]] = None,
+        affinity: Optional[k8s.V1Affinity] = None,
+        tolerations: Optional[List[k8s.V1Toleration]] = None,
+        volume_mounts: Optional[List[k8s.V1VolumeMount]] = None,
+        volumes: Optional[List[k8s.V1Volume]] = None,
+        config_map_mounts: Optional[Dict[str, str]] = None,
+        from_env_config_map: Optional[List[str]] = None,
+        from_env_secret: Optional[List[str]] = None,
+        hadoop_config: Optional[dict] = None,
+        application_file: Optional[str] = None,
+        image_pull_secrets: Optional[Union[List[k8s.V1LocalObjectReference], str]] = None,
+        get_logs: bool = True,
+        do_xcom_push: bool = False,
+        restart_policy: Optional[dict] = None,
+        spark_version: str = '3.0.0',
+        success_run_history_limit: int = 1,
+        dynamic_allocation: bool = False,
+        dynamic_alloc_max_executors: Optional[int] = None,
+        dynamic_alloc_initial_executors: int = 1,
+        dynamic_alloc_min_executors: int = 1,
+        image_pull_policy: str = 'Always',
+        service_account_name: str = 'default',
+        spark_job_mode: str = 'cluster',
+        spark_job_python_version: str = '3',
+        spark_job_type: str = 'Python',
+        startup_timeout_seconds=600,
+        log_events_on_failure: bool = False,
+        in_cluster: Optional[bool] = None,
+        reattach_on_restart: bool = True,
+        delete_on_termination: bool = True,
+        kubernetes_conn_id: str = 'kubernetes_default',
         **kwargs,
     ) -> None:
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
         super().__init__(**kwargs)
         self.application_file = application_file
         self.namespace = namespace
         self.kubernetes_conn_id = kubernetes_conn_id
+        self.labels = labels or {}
+        self.env_from = env_from or []
+        self.env_vars = convert_env_vars(env_vars) if env_vars else []
+        self.affinity = convert_affinity(affinity) if affinity else k8s.V1Affinity()
+        self.tolerations = (
+            [convert_toleration(toleration) for toleration in tolerations] if tolerations else []
+        )
+        self.volume_mounts = [convert_volume_mount(v) for v in volume_mounts] if volume_mounts else []
+        self.volumes = [convert_volume(volume) for volume in volumes] if volumes else []
+        self.startup_timeout_seconds = startup_timeout_seconds
+        self.reattach_on_restart = reattach_on_restart
+        self.delete_on_termination = delete_on_termination
+        self.application_file = application_file
+        self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
+        self.do_xcom_push = do_xcom_push
+        self.name = PodGenerator.make_unique_pod_id(self.task_id)
+        if self.name:
+            self.name = self.name[:MAX_LABEL_LEN]
+        self.cluster_context = cluster_context
+        self.config_file = config_file
+        self.namespace = namespace
+        self.get_logs = get_logs
         self.api_group = api_group
         self.api_version = api_version
-        self.plural = "sparkapplications"
-
-    def execute(self, context: Context):
-        hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
-        self.log.info("Creating sparkApplication")
-        response = hook.create_custom_object(
-            group=self.api_group,
-            version=self.api_version,
-            plural=self.plural,
-            body=self.application_file,
+        self.api_kind = api_kind
+        self.api_plural = api_plural
+        self.code_path = code_path
+        self.dynamic_allocation = dynamic_allocation
+        self.dynamic_alloc_max_executors = dynamic_alloc_max_executors
+        self.dynamic_alloc_min_executors = dynamic_alloc_min_executors
+        self.dynamic_alloc_initial_executors = dynamic_alloc_initial_executors
+        if dynamic_allocation:
+            if not all(
+                [dynamic_alloc_max_executors, dynamic_alloc_min_executors, dynamic_alloc_initial_executors]
+            ):
+                raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed")
+        if config_map_mounts:
+            vols, vols_mounts = convert_configmap_to_volume(config_map_mounts)
+            self.volumes.extend(vols)
+            self.volume_mounts.extend(vols_mounts)
+        if from_env_config_map:
+            self.env_from.extend([convert_configmap(c) for c in from_env_config_map])
+        if from_env_secret:
+            self.env_from.extend([convert_secret(c) for c in from_env_secret])
+        self.log_events_on_failure = log_events_on_failure
+        self.in_cluster = in_cluster
+        self.image_pull_policy = image_pull_policy
+        self.service_account_name = service_account_name
+        self.image = image
+        self.spark_version = spark_version
+        self.spark_job_type = spark_job_type
+        self.spark_job_python_version = spark_job_python_version
+        self.spark_job_mode = spark_job_mode
+        self.success_run_history_limit = success_run_history_limit
+        self.number_workers = number_workers
+        self.spark_obj_spec = None
+        self.restart_policy = restart_policy or {'type': 'Never'}
+        self.hadoop_config = hadoop_config
+        self.job_resources = SparkResources(**resources) if resources else SparkResources()
+
+    def get_kube_clients(self):
+        if self.in_cluster is not None:
+            core_v1_api = kube_client.get_kube_client(
+                in_cluster=self.in_cluster,
+                cluster_context=self.cluster_context,
+                config_file=self.config_file,
+            )
+        else:
+            core_v1_api = kube_client.get_kube_client(
+                cluster_context=self.cluster_context, config_file=self.config_file
+            )
+        custom_obj_api = client.CustomObjectsApi()
+        return core_v1_api, custom_obj_api
+
+    @staticmethod
+    def _get_pod_identifying_label_string(labels) -> str:
+        filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
+        return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())])
+
+    @staticmethod
+    def create_labels_for_pod(context: Optional[dict] = None, include_try_number: bool = True) -> dict:
+        """
+        Generate labels for the pod to track the pod in case of Operator crash
+        :param include_try_number: add try number to labels
+        :param context: task context provided by airflow DAG
+        :return: dict
+        """
+        if not context:
+            return {}
+
+        ti = context['ti']
+        run_id = context['run_id']
+
+        labels = {
+            'dag_id': ti.dag_id,
+            'task_id': ti.task_id,
+            'run_id': run_id,
+            'spark_kubernetes_operator': 'True',
+            # 'execution_date': context['ts'],
+            # 'try_number': context['ti'].try_number,
+        }
+
+        # If running on Airflow 2.3+:
+        map_index = getattr(ti, 'map_index', -1)
+        if map_index >= 0:
+            labels['map_index'] = map_index
+
+        if include_try_number:
+            labels.update(try_number=ti.try_number)
+
+        # In the case of sub dags this is just useful
+        if context['dag'].is_subdag:
+            labels['parent_dag_id'] = context['dag'].parent_dag.dag_id
+        # Ensure that label is valid for Kube,
+        # and if not truncate/remove invalid chars and replace with short hash.
+        for label_id, label in labels.items():
+            safe_label = pod_generator.make_safe_label_value(str(label))
+            labels[label_id] = safe_label
+        return labels
+
+    @cached_property
+    def pod_manager(self) -> PodManager:
+        return PodManager(kube_client=self.client)
+
+    @staticmethod
+    def _try_numbers_match(context, pod) -> bool:
+        return pod.metadata.labels['try_number'] == context['ti'].try_number
+
+    def build_spark_request_obj(self, kube_client, custom_obj_api):
+        launcher = CustomObjectLauncher(
             namespace=self.namespace,
+            kube_client=kube_client,
+            custom_obj_api=custom_obj_api,
+            api_group=self.api_group,
+            kind=self.api_kind,
+            plural=self.api_plural,
+            api_version=self.api_version,
+            extract_xcom=self.do_xcom_push,
+            application_file=self.application_file,
         )
-        return response
+        launcher.set_body(
+            application_file=self.application_file,
+            name=self.name,
+            namespace=self.namespace,
+            image=self.image,
+            code_path=self.code_path,
+            image_pull_policy=self.image_pull_policy,
+            restart_policy=self.restart_policy,
+            spark_version=self.spark_version,
+            spark_job_type=self.spark_job_type,
+            spark_job_python_version=self.spark_job_python_version,
+            spark_job_mode=self.spark_job_mode,
+            labels=self.labels,
+            success_run_history_limit=self.success_run_history_limit,
+            service_account_name=self.service_account_name,
+            dynamic_allocation=self.dynamic_allocation,
+            dynamic_alloc_initial_executors=self.dynamic_alloc_initial_executors,
+            dynamic_alloc_max_executors=self.dynamic_alloc_max_executors,
+            dynamic_alloc_min_executors=self.dynamic_alloc_min_executors,
+            driver_resource=self.job_resources.driver_resources,
+            executor_resource=self.job_resources.executor_resources,
+            number_workers=self.number_workers,
+            hadoop_config=self.hadoop_config,
+            image_pull_secrets=self.image_pull_secrets,
+            env=self.env_vars,
+            env_from=self.env_from,
+            affinity=self.affinity,
+            tolerations=self.tolerations,
+            volumes=self.volumes,
+            volume_mounts=self.volume_mounts,
+        )
+        return launcher
+
+    def find_spark_job(self, context):
+        labels = self.create_labels_for_pod(context, include_try_number=False)
+        label_selector = self._get_pod_identifying_label_string(labels) + ',spark-role=driver'
+        pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector).items
+
+        pod = None
+        if len(pod_list) > 1:  # and self.reattach_on_restart:
+            raise AirflowException(f'More than one pod running with labels: {label_selector}')
+        elif len(pod_list) == 1:
+            pod = pod_list[0]
+            self.log.info(
+                "Found matching driver pod %s with labels %s", pod.metadata.name, pod.metadata.labels
+            )
+            self.log.info("`try_number` of task_instance: %s", context['ti'].try_number)
+            self.log.info("`try_number` of pod: %s", pod.metadata.labels['try_number'])
+        return pod
+
+    def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context):
+        if self.reattach_on_restart:
+            driver_pod = self.find_spark_job(context)
+            if driver_pod:
+                return driver_pod
+
+        # self.log.debug("Starting spark job:\n%s", yaml.safe_dump(launcher.body.to_dict()))
+        driver_pod, spark_obj_spec = launcher.start_spark_job(startup_timeout=self.startup_timeout_seconds)
+        return driver_pod
+
+    def extract_xcom(self, pod):
+        """Retrieves xcom value and kills xcom sidecar container"""
+        result = self.pod_manager.extract_xcom(pod)
+        self.log.info("xcom result: \n%s", result)
+        return json.loads(result)
+
+    def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
+        pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
+        if not self.delete_on_termination:
+            with _suppress(Exception):
+                self.patch_already_checked(pod)
+        if pod_phase != PodPhase.SUCCEEDED:
+            if self.log_events_on_failure:
+                with _suppress(Exception):
+                    for event in self.pod_manager.read_pod_events(pod).items:
+                        self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            with _suppress(Exception):
+                self.process_spark_job_deletion(pod)
+        else:
+            with _suppress(Exception):
+                self.process_spark_job_deletion(pod)
+
+    def process_spark_job_deletion(self, pod):
+        if self.delete_on_termination:
+            self.log.info("Deleting spark job: %s", pod.metadata)
+            self.launcher.delete_spark_job(pod.metadata.name.replace('-driver', ''))
+        else:
+            self.log.info("skipping deleting spark job: %s", pod.metadata.name)
+
+    def execute(self, context):

Review Comment:
   hmm not quite getting what you mean. what would be the benefit of creating another operator? it would become similar to this. 
   Also, the KPO code is not easily reusable, otherwise, I would have used it to handle airflow-related functionalities like logging etc here. 
   



##########
airflow/kubernetes/custom_object_launcher.py:
##########
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Launches Custom object"""
+import sys
+import time
+from copy import deepcopy
+from datetime import datetime as dt
+from typing import Optional
+
+import tenacity
+import yaml
+from kubernetes import client, watch
+from kubernetes.client import models as k8s
+from kubernetes.client.rest import ApiException
+
+from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+def should_retry_start_spark_job(exception: BaseException) -> bool:
+    """Check if an Exception indicates a transient error and warrants retrying"""
+    if isinstance(exception, ApiException):
+        return exception.status == 409
+    return False
+
+
+class SparkResources:
+    """spark resources
+    :param request_memory: requested memory
+    :param request_cpu: requested CPU number
+    :param request_ephemeral_storage: requested ephemeral storage
+    :param limit_memory: limit for memory usage
+    :param limit_cpu: Limit for CPU used
+    :param limit_gpu: Limits for GPU used
+    :param limit_ephemeral_storage: Limit for ephemeral storage
+    """
+
+    def __init__(
+        self,
+        **kwargs,
+    ):
+        self.driver_request_cpu = kwargs.get('driver_request_cpu')
+        self.driver_limit_cpu = kwargs.get('driver_limit_cpu')
+        self.driver_limit_memory = kwargs.get('driver_limit_memory')
+        self.executor_request_cpu = kwargs.get('executor_request_cpu')
+        self.executor_limit_cpu = kwargs.get('executor_limit_cpu')
+        self.executor_limit_memory = kwargs.get('executor_limit_memory')
+        self.driver_gpu_name = kwargs.get('driver_gpu_name')
+        self.driver_gpu_quantity = kwargs.get('driver_gpu_quantity')
+        self.executor_gpu_name = kwargs.get('executor_gpu_name')
+        self.executor_gpu_quantity = kwargs.get('executor_gpu_quantity')
+        self.convert_resources()
+
+    @property
+    def resources(self):
+        """Return job resources"""
+        return {'driver': self.driver_resources, 'executor': self.executor_resources}
+
+    @property
+    def driver_resources(self):
+        """Return resources to use"""
+        driver = {}
+        if self.driver_request_cpu:
+            driver['cores'] = self.driver_request_cpu
+        if self.driver_limit_cpu:
+            driver['coreLimit'] = self.driver_limit_cpu
+        if self.driver_limit_memory:
+            driver['memory'] = self.driver_limit_memory
+        if self.driver_gpu_name and self.driver_gpu_quantity:
+            driver['gpu'] = {'name': self.driver_gpu_name, 'quantity': self.driver_gpu_quantity}
+        return driver
+
+    @property
+    def executor_resources(self):
+        """Return resources to use"""
+        executor = {}
+        if self.executor_request_cpu:
+            executor['cores'] = self.executor_request_cpu
+        if self.executor_limit_cpu:
+            executor['coreLimit'] = self.executor_limit_cpu
+        if self.executor_limit_memory:
+            executor['memory'] = self.executor_limit_memory
+        if self.executor_gpu_name and self.executor_gpu_quantity:
+            executor['gpu'] = {'name': self.executor_gpu_name, 'quantity': self.executor_gpu_quantity}
+        return executor
+
+    def convert_resources(self):
+        if isinstance(self.driver_limit_memory, str):
+            if 'G' in self.driver_limit_memory or 'Gi' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.driver_limit_memory = str(int(self.driver_limit_memory / 1.4)) + 'm'
+
+        if isinstance(self.executor_limit_memory, str):
+            if 'G' in self.executor_limit_memory or 'Gi' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.executor_limit_memory = str(int(self.executor_limit_memory / 1.4)) + 'm'
+
+        if self.driver_request_cpu:
+            self.driver_request_cpu = int(float(self.driver_request_cpu))
+        if self.driver_limit_cpu:
+            self.driver_limit_cpu = str(self.driver_limit_cpu)
+        if self.executor_request_cpu:
+            self.executor_request_cpu = int(float(self.executor_request_cpu))
+        if self.executor_limit_cpu:
+            self.executor_limit_cpu = str(self.executor_limit_cpu)
+
+        if self.driver_gpu_quantity:
+            self.driver_gpu_quantity = int(float(self.driver_gpu_quantity))
+        if self.executor_gpu_quantity:
+            self.executor_gpu_quantity = int(float(self.executor_gpu_quantity))
+
+
+class CustomObjectStatus:
+    """Status of the PODs"""
+
+    SUBMITTED = 'SUBMITTED'
+    RUNNING = 'RUNNING'
+    FAILED = 'FAILED'
+    SUCCEEDED = 'SUCCEEDED'
+    INITIAL_SPEC = {
+        'metadata': {},
+        'spec': {
+            'dynamicAllocation': {'enabled': False},
+            'driver': {},
+            'executor': {},
+        },
+    }
+
+
+class CustomObjectLauncher(LoggingMixin):
+    """Launches PODS"""
+
+    def __init__(
+        self,
+        kube_client: client.CoreV1Api,
+        custom_obj_api: client.CustomObjectsApi,
+        namespace: str = 'default',
+        api_group: str = 'sparkoperator.k8s.io',
+        api_version: str = 'v1beta2',
+        plural: str = 'sparkapplications',
+        kind: str = 'SparkApplication',
+        extract_xcom: bool = False,
+        application_file: Optional[str] = None,
+    ):
+        """
+        Creates the launcher.
+
+        :param kube_client: kubernetes client
+        :param extract_xcom: whether we should extract xcom
+        """
+        super().__init__()
+        self.namespace = namespace
+        self.api_group = api_group
+        self.api_version = api_version
+        self.plural = plural
+        self.kind = kind
+        self._client = kube_client
+        self.custom_obj_api = custom_obj_api
+        self._watch = watch.Watch()
+        self.extract_xcom = extract_xcom
+        self.spark_obj_spec: dict = {}
+        self.pod_spec: dict = {}
+        self.body: dict = {}
+        self.application_file = application_file
+
+    @cached_property
+    def pod_manager(self) -> PodManager:
+        return PodManager(kube_client=self._client)
+
+    @staticmethod
+    def _load_body(file):
+        # try:
+        #     base_body = yaml.safe_load(file)
+        # except Exception:
+        try:
+            with open(file) as data:
+                base_body = yaml.safe_load(data)
+        except yaml.YAMLError as e:
+            raise AirflowException(f"Exception when loading resource definition: {e}\n")

Review Comment:
   not sure what you mean but it is just raised when the yaml file is not valid to be loaded. maybe the message is not quite clear I will update that.



##########
airflow/kubernetes/custom_object_launcher.py:
##########
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Launches Custom object"""
+import sys
+import time
+from copy import deepcopy
+from datetime import datetime as dt
+from typing import Optional
+
+import tenacity
+import yaml
+from kubernetes import client, watch
+from kubernetes.client import models as k8s
+from kubernetes.client.rest import ApiException
+
+from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+def should_retry_start_spark_job(exception: BaseException) -> bool:
+    """Check if an Exception indicates a transient error and warrants retrying"""
+    if isinstance(exception, ApiException):
+        return exception.status == 409
+    return False
+
+
+class SparkResources:
+    """spark resources
+    :param request_memory: requested memory
+    :param request_cpu: requested CPU number
+    :param request_ephemeral_storage: requested ephemeral storage
+    :param limit_memory: limit for memory usage
+    :param limit_cpu: Limit for CPU used
+    :param limit_gpu: Limits for GPU used
+    :param limit_ephemeral_storage: Limit for ephemeral storage
+    """
+
+    def __init__(
+        self,
+        **kwargs,
+    ):
+        self.driver_request_cpu = kwargs.get('driver_request_cpu')
+        self.driver_limit_cpu = kwargs.get('driver_limit_cpu')
+        self.driver_limit_memory = kwargs.get('driver_limit_memory')
+        self.executor_request_cpu = kwargs.get('executor_request_cpu')
+        self.executor_limit_cpu = kwargs.get('executor_limit_cpu')
+        self.executor_limit_memory = kwargs.get('executor_limit_memory')
+        self.driver_gpu_name = kwargs.get('driver_gpu_name')
+        self.driver_gpu_quantity = kwargs.get('driver_gpu_quantity')
+        self.executor_gpu_name = kwargs.get('executor_gpu_name')
+        self.executor_gpu_quantity = kwargs.get('executor_gpu_quantity')
+        self.convert_resources()
+
+    @property
+    def resources(self):
+        """Return job resources"""
+        return {'driver': self.driver_resources, 'executor': self.executor_resources}
+
+    @property
+    def driver_resources(self):
+        """Return resources to use"""
+        driver = {}
+        if self.driver_request_cpu:
+            driver['cores'] = self.driver_request_cpu
+        if self.driver_limit_cpu:
+            driver['coreLimit'] = self.driver_limit_cpu
+        if self.driver_limit_memory:
+            driver['memory'] = self.driver_limit_memory
+        if self.driver_gpu_name and self.driver_gpu_quantity:
+            driver['gpu'] = {'name': self.driver_gpu_name, 'quantity': self.driver_gpu_quantity}
+        return driver
+
+    @property
+    def executor_resources(self):
+        """Return resources to use"""
+        executor = {}
+        if self.executor_request_cpu:
+            executor['cores'] = self.executor_request_cpu
+        if self.executor_limit_cpu:
+            executor['coreLimit'] = self.executor_limit_cpu
+        if self.executor_limit_memory:
+            executor['memory'] = self.executor_limit_memory
+        if self.executor_gpu_name and self.executor_gpu_quantity:
+            executor['gpu'] = {'name': self.executor_gpu_name, 'quantity': self.executor_gpu_quantity}
+        return executor
+
+    def convert_resources(self):
+        if isinstance(self.driver_limit_memory, str):
+            if 'G' in self.driver_limit_memory or 'Gi' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.driver_limit_memory = str(int(self.driver_limit_memory / 1.4)) + 'm'
+
+        if isinstance(self.executor_limit_memory, str):
+            if 'G' in self.executor_limit_memory or 'Gi' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.executor_limit_memory = str(int(self.executor_limit_memory / 1.4)) + 'm'
+
+        if self.driver_request_cpu:
+            self.driver_request_cpu = int(float(self.driver_request_cpu))
+        if self.driver_limit_cpu:
+            self.driver_limit_cpu = str(self.driver_limit_cpu)
+        if self.executor_request_cpu:
+            self.executor_request_cpu = int(float(self.executor_request_cpu))
+        if self.executor_limit_cpu:
+            self.executor_limit_cpu = str(self.executor_limit_cpu)
+
+        if self.driver_gpu_quantity:
+            self.driver_gpu_quantity = int(float(self.driver_gpu_quantity))
+        if self.executor_gpu_quantity:
+            self.executor_gpu_quantity = int(float(self.executor_gpu_quantity))
+
+
+class CustomObjectStatus:
+    """Status of the PODs"""
+
+    SUBMITTED = 'SUBMITTED'
+    RUNNING = 'RUNNING'
+    FAILED = 'FAILED'
+    SUCCEEDED = 'SUCCEEDED'
+    INITIAL_SPEC = {
+        'metadata': {},
+        'spec': {
+            'dynamicAllocation': {'enabled': False},
+            'driver': {},
+            'executor': {},
+        },
+    }
+
+
+class CustomObjectLauncher(LoggingMixin):
+    """Launches PODS"""
+
+    def __init__(
+        self,
+        kube_client: client.CoreV1Api,
+        custom_obj_api: client.CustomObjectsApi,
+        namespace: str = 'default',
+        api_group: str = 'sparkoperator.k8s.io',
+        api_version: str = 'v1beta2',
+        plural: str = 'sparkapplications',
+        kind: str = 'SparkApplication',
+        extract_xcom: bool = False,
+        application_file: Optional[str] = None,
+    ):
+        """
+        Creates the launcher.
+
+        :param kube_client: kubernetes client
+        :param extract_xcom: whether we should extract xcom
+        """
+        super().__init__()
+        self.namespace = namespace
+        self.api_group = api_group
+        self.api_version = api_version
+        self.plural = plural
+        self.kind = kind
+        self._client = kube_client
+        self.custom_obj_api = custom_obj_api
+        self._watch = watch.Watch()
+        self.extract_xcom = extract_xcom
+        self.spark_obj_spec: dict = {}
+        self.pod_spec: dict = {}
+        self.body: dict = {}
+        self.application_file = application_file
+
+    @cached_property
+    def pod_manager(self) -> PodManager:
+        return PodManager(kube_client=self._client)
+
+    @staticmethod
+    def _load_body(file):
+        # try:
+        #     base_body = yaml.safe_load(file)
+        # except Exception:
+        try:
+            with open(file) as data:
+                base_body = yaml.safe_load(data)
+        except yaml.YAMLError as e:
+            raise AirflowException(f"Exception when loading resource definition: {e}\n")
+        return base_body
+
+    def set_body(self, **kwargs):
+        if self.application_file:
+            self.body = self._load_body(self.application_file)
+        else:
+            self.body = self.get_body(
+                f'{self.api_group}/{self.api_version}', self.kind, CustomObjectStatus.INITIAL_SPEC, **kwargs
+            )
+
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(3),
+        wait=tenacity.wait_random_exponential(),
+        reraise=True,
+        retry=tenacity.retry_if_exception(should_retry_start_spark_job),
+    )
+    def start_spark_job(self, startup_timeout: int = 600):
+        """
+        Launches the pod synchronously and waits for completion.
+
+        :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task)
+        :return:
+        """
+        try:
+            self.log.debug('Spark Job Creation Request Submitted')
+            self.spark_obj_spec = self.custom_obj_api.create_namespaced_custom_object(
+                group=self.api_group,
+                version=self.api_version,
+                namespace=self.namespace,
+                plural=self.plural,
+                body=self.body,
+            )
+            self.log.debug('Spark Job Creation Response: %s', self.spark_obj_spec)
+
+            # Wait for the driver pod to come alive
+            self.pod_spec = k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(
+                    labels=self.spark_obj_spec['spec']['driver']['labels'],
+                    name=self.spark_obj_spec['metadata']['name'] + '-driver',
+                    namespace=self.namespace,
+                )
+            )
+            curr_time = dt.now()
+            while self.spark_job_not_running(self.spark_obj_spec):
+                self.log.warning(
+                    "Spark job submitted but not yet started: %s", self.spark_obj_spec['metadata']['name']
+                )
+                delta = dt.now() - curr_time
+                if delta.total_seconds() >= startup_timeout:
+                    pod_status = self.pod_manager.read_pod(self.pod_spec).status.container_statuses
+                    raise AirflowException(f"Job took too long to start. pod status: {pod_status}")
+                time.sleep(2)
+        except Exception as e:
+            self.log.exception('Exception when attempting to create spark job: %s', self.body)
+            raise e
+
+        return self.pod_spec, self.spark_obj_spec
+
+    def spark_job_not_running(self, spark_obj_spec):
+        """Tests if spark_obj_spec has not started"""
+        spark_job_info = self.custom_obj_api.get_namespaced_custom_object_status(
+            group=self.api_group,
+            version=self.api_version,
+            namespace=self.namespace,
+            name=spark_obj_spec['metadata']['name'],
+            plural=self.plural,
+        )
+        driver_state = spark_job_info.get('status', {}).get('applicationState', {}).get('state', 'SUBMITTED')
+        if driver_state == CustomObjectStatus.FAILED:
+            err = spark_job_info.get('status', {}).get('applicationState', {}).get('errorMessage')
+            raise AirflowException(f"Spark Job Failed. Error stack:\n{err}")
+        return driver_state == CustomObjectStatus.SUBMITTED
+
+    def delete_spark_job(self, spark_job_name=None):
+        """Deletes spark job"""
+        spark_job_name = spark_job_name or self.spark_obj_spec.get('metadata', {}).get('name')
+        if not spark_job_name:
+            self.log.warning("Spark job not found: %s", spark_job_name)
+            return
+        try:
+            v1 = client.CustomObjectsApi()
+            v1.delete_namespaced_custom_object(
+                group=self.api_group,
+                version=self.api_version,
+                namespace=self.namespace,
+                plural=self.plural,
+                name=spark_job_name,
+            )
+        except ApiException as e:
+            # If the pod is already deleted
+            if e.status != 404:
+                raise
+
+    @staticmethod
+    def get_body(api_version, kind, initial_template, **kwargs):

Review Comment:
   we want to remove the usage of yaml file here. These are parameters that are passed by the module that creates this crd. the module has already knows the them and pass it over. 
   
   it could be a yaml file that is provided to the operator by the user or it could be a few parameters passed to the operator in python. in fact the whole point of having this PR is to enable user to pass parameters in python and the operator takes care of the setting the variables and adjusting K8s based setting. 
   
   I can define all these parameters as as arguments in the function but it wont make much difference and it is much more boiler plate code. 
   
   If we are able to define Dataclass and no problem with backward incompatibility(support starts since python 3.7), then I could make it more readable by defining an object like K8sBody and used it across.



##########
airflow/kubernetes/custom_object_launcher.py:
##########
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Launches Custom object"""
+import sys
+import time
+from copy import deepcopy
+from datetime import datetime as dt
+from typing import Optional
+
+import tenacity
+import yaml
+from kubernetes import client, watch
+from kubernetes.client import models as k8s
+from kubernetes.client.rest import ApiException
+
+from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+def should_retry_start_spark_job(exception: BaseException) -> bool:
+    """Check if an Exception indicates a transient error and warrants retrying"""
+    if isinstance(exception, ApiException):
+        return exception.status == 409
+    return False
+
+
+class SparkResources:
+    """spark resources
+    :param request_memory: requested memory
+    :param request_cpu: requested CPU number
+    :param request_ephemeral_storage: requested ephemeral storage
+    :param limit_memory: limit for memory usage
+    :param limit_cpu: Limit for CPU used
+    :param limit_gpu: Limits for GPU used
+    :param limit_ephemeral_storage: Limit for ephemeral storage
+    """
+
+    def __init__(
+        self,
+        **kwargs,
+    ):
+        self.driver_request_cpu = kwargs.get('driver_request_cpu')
+        self.driver_limit_cpu = kwargs.get('driver_limit_cpu')
+        self.driver_limit_memory = kwargs.get('driver_limit_memory')
+        self.executor_request_cpu = kwargs.get('executor_request_cpu')
+        self.executor_limit_cpu = kwargs.get('executor_limit_cpu')
+        self.executor_limit_memory = kwargs.get('executor_limit_memory')
+        self.driver_gpu_name = kwargs.get('driver_gpu_name')
+        self.driver_gpu_quantity = kwargs.get('driver_gpu_quantity')
+        self.executor_gpu_name = kwargs.get('executor_gpu_name')
+        self.executor_gpu_quantity = kwargs.get('executor_gpu_quantity')
+        self.convert_resources()
+
+    @property
+    def resources(self):
+        """Return job resources"""
+        return {'driver': self.driver_resources, 'executor': self.executor_resources}
+
+    @property
+    def driver_resources(self):
+        """Return resources to use"""
+        driver = {}
+        if self.driver_request_cpu:
+            driver['cores'] = self.driver_request_cpu
+        if self.driver_limit_cpu:
+            driver['coreLimit'] = self.driver_limit_cpu
+        if self.driver_limit_memory:
+            driver['memory'] = self.driver_limit_memory
+        if self.driver_gpu_name and self.driver_gpu_quantity:
+            driver['gpu'] = {'name': self.driver_gpu_name, 'quantity': self.driver_gpu_quantity}
+        return driver
+
+    @property
+    def executor_resources(self):
+        """Return resources to use"""
+        executor = {}
+        if self.executor_request_cpu:
+            executor['cores'] = self.executor_request_cpu
+        if self.executor_limit_cpu:
+            executor['coreLimit'] = self.executor_limit_cpu
+        if self.executor_limit_memory:
+            executor['memory'] = self.executor_limit_memory
+        if self.executor_gpu_name and self.executor_gpu_quantity:
+            executor['gpu'] = {'name': self.executor_gpu_name, 'quantity': self.executor_gpu_quantity}
+        return executor
+
+    def convert_resources(self):
+        if isinstance(self.driver_limit_memory, str):
+            if 'G' in self.driver_limit_memory or 'Gi' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.driver_limit_memory = str(int(self.driver_limit_memory / 1.4)) + 'm'
+
+        if isinstance(self.executor_limit_memory, str):
+            if 'G' in self.executor_limit_memory or 'Gi' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.executor_limit_memory = str(int(self.executor_limit_memory / 1.4)) + 'm'
+
+        if self.driver_request_cpu:
+            self.driver_request_cpu = int(float(self.driver_request_cpu))
+        if self.driver_limit_cpu:
+            self.driver_limit_cpu = str(self.driver_limit_cpu)
+        if self.executor_request_cpu:
+            self.executor_request_cpu = int(float(self.executor_request_cpu))
+        if self.executor_limit_cpu:
+            self.executor_limit_cpu = str(self.executor_limit_cpu)
+
+        if self.driver_gpu_quantity:
+            self.driver_gpu_quantity = int(float(self.driver_gpu_quantity))
+        if self.executor_gpu_quantity:
+            self.executor_gpu_quantity = int(float(self.executor_gpu_quantity))
+
+
+class CustomObjectStatus:
+    """Status of the PODs"""
+
+    SUBMITTED = 'SUBMITTED'
+    RUNNING = 'RUNNING'
+    FAILED = 'FAILED'
+    SUCCEEDED = 'SUCCEEDED'
+    INITIAL_SPEC = {
+        'metadata': {},
+        'spec': {
+            'dynamicAllocation': {'enabled': False},
+            'driver': {},
+            'executor': {},
+        },
+    }
+
+
+class CustomObjectLauncher(LoggingMixin):
+    """Launches PODS"""
+
+    def __init__(
+        self,
+        kube_client: client.CoreV1Api,
+        custom_obj_api: client.CustomObjectsApi,
+        namespace: str = 'default',
+        api_group: str = 'sparkoperator.k8s.io',
+        api_version: str = 'v1beta2',
+        plural: str = 'sparkapplications',
+        kind: str = 'SparkApplication',
+        extract_xcom: bool = False,
+        application_file: Optional[str] = None,
+    ):
+        """
+        Creates the launcher.
+
+        :param kube_client: kubernetes client
+        :param extract_xcom: whether we should extract xcom
+        """
+        super().__init__()
+        self.namespace = namespace
+        self.api_group = api_group
+        self.api_version = api_version
+        self.plural = plural
+        self.kind = kind
+        self._client = kube_client
+        self.custom_obj_api = custom_obj_api
+        self._watch = watch.Watch()
+        self.extract_xcom = extract_xcom
+        self.spark_obj_spec: dict = {}
+        self.pod_spec: dict = {}
+        self.body: dict = {}
+        self.application_file = application_file
+
+    @cached_property
+    def pod_manager(self) -> PodManager:
+        return PodManager(kube_client=self._client)
+
+    @staticmethod
+    def _load_body(file):
+        # try:
+        #     base_body = yaml.safe_load(file)
+        # except Exception:

Review Comment:
   removed



##########
airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -32,47 +58,411 @@ class SparkKubernetesOperator(BaseOperator):
 
     .. seealso::
         For more detail about Spark Application Object have a look at the reference:
-        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
+        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication
 
-    :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
-        path to a '.yaml' file, '.json' file, YAML string or JSON string.
+    :param application_file: filepath to kubernetes custom_resource_definition of sparkApplication
+    :param kubernetes_conn_id: the connection to Kubernetes cluster
+    :param image: Docker image you wish to launch. Defaults to hub.docker.com,
+    :param code_path: path to the code in your image,
     :param namespace: kubernetes namespace to put sparkApplication
-    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
-        for the to Kubernetes cluster.
-    :param api_group: kubernetes api group of sparkApplication
-    :param api_version: kubernetes api version of sparkApplication
+    :param api_group: CRD api group for spark
+            https://github.com/GoogleCloudPlatform/spark-on-k8s-operator#project-status
+    :param api_version: CRD api version
+    :param api_kind: CRD api kind
+    :param api_plural: CRD api plural
+    :param cluster_context: context of the cluster
+    :param labels: labels to apply to the crd.
+    :param config_file: kube configuration file
+    :param resources: resources for the launched pod.
+    :param number_workers: number spark executors
+    :param env_vars: A dictionary of key:value OR list of V1EnvVar items
+    :param env_from: A list of V1EnvFromSource items
+    :param affinity: Affinity scheduling rules for the launched pod.(V1Affinity)
+    :param tolerations: A list of kubernetes tolerations.(V1Toleration)
+    :param volume_mounts: A list of V1VolumeMount items
+    :param volumes: A list of V1Volume items
+    :param config_map_mounts: A dictionary of config_map as key and path as value
+    :param from_env_config_map: Read configmap into a env variable(name of the configmap)
+    :param from_env_secret: Read secret into a env variable(name of the configmap)
+    :param hadoop_config: hadoop base config e.g, AWS s3 config
+    :param application_file: yaml file if passed
+    :param image_pull_secrets: Any image pull secrets to be given to the pod.
+        If more than one secret is required, provide a
+        comma separated list: secret_a,secret_b
+    :param get_logs: get the stdout of the container as logs of the tasks.
+    :param do_xcom_push: If True, the content of the file
+        /airflow/xcom/return.json in the container will also be pushed to an
+        XCom when the container completes.
+    :param restart_policy: restart policy of the driver/executor
+    :param spark_version: spark version
+    :param success_run_history_limit: Number of past successful runs of the application to keep.
+    :param delete_on_termination: What to do when the pod reaches its final
+        state, or the execution is interrupted. If True (default), delete the
+        pod; if False, leave the pod.
+    :param dynamic_allocation: Enable spark dynamic allocation
+    :param dynamic_alloc_max_executors: Max number of executor if dynamic_allocation is enabled
+    :param dynamic_alloc_initial_executors: Initial number of executor if dynamic_allocation is enabled
+    :param dynamic_alloc_min_executors: min number of executor if dynamic_allocation is enabled
+    :param image_pull_policy: Specify a policy to cache or always pull an image.
+    :param service_account_name: Name of the service account
+    :param spark_job_mode: spark job type in spark operator(at the time of writing it just supports cluster)
+    :param spark_job_python_version: version of spark python
+    :param spark_job_type: type of spark job
+    :param startup_timeout_seconds: timeout in seconds to startup the pod.
+    :param log_events_on_failure: Log the pod's events if a failure occurs
+    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor
     """
 
-    template_fields: Sequence[str] = ('application_file', 'namespace')
-    template_ext: Sequence[str] = ('.yaml', '.yml', '.json')
+    template_fields = ['application_file', 'namespace']
+    template_ext = ('yaml', 'yml', 'json')
     ui_color = '#f4a460'
 
     def __init__(
         self,
         *,
-        application_file: str,
-        namespace: str | None = None,
-        kubernetes_conn_id: str = 'kubernetes_default',
+        image: Optional[str] = None,
+        code_path: Optional[str] = None,
+        namespace: Optional[str] = 'default',
         api_group: str = 'sparkoperator.k8s.io',
         api_version: str = 'v1beta2',
+        api_kind: str = 'SparkApplication',
+        api_plural: str = 'sparkapplications',
+        cluster_context: Optional[str] = None,
+        config_file: Optional[str] = None,
+        labels: Optional[dict] = None,
+        resources: Optional[dict] = None,
+        number_workers: int = 1,
+        env_vars: Optional[Union[List[k8s.V1EnvVar], Dict]] = None,
+        env_from: Optional[List[k8s.V1EnvFromSource]] = None,
+        affinity: Optional[k8s.V1Affinity] = None,
+        tolerations: Optional[List[k8s.V1Toleration]] = None,
+        volume_mounts: Optional[List[k8s.V1VolumeMount]] = None,
+        volumes: Optional[List[k8s.V1Volume]] = None,
+        config_map_mounts: Optional[Dict[str, str]] = None,
+        from_env_config_map: Optional[List[str]] = None,
+        from_env_secret: Optional[List[str]] = None,
+        hadoop_config: Optional[dict] = None,
+        application_file: Optional[str] = None,
+        image_pull_secrets: Optional[Union[List[k8s.V1LocalObjectReference], str]] = None,
+        get_logs: bool = True,
+        do_xcom_push: bool = False,
+        restart_policy: Optional[dict] = None,
+        spark_version: str = '3.0.0',
+        success_run_history_limit: int = 1,
+        dynamic_allocation: bool = False,
+        dynamic_alloc_max_executors: Optional[int] = None,
+        dynamic_alloc_initial_executors: int = 1,
+        dynamic_alloc_min_executors: int = 1,
+        image_pull_policy: str = 'Always',
+        service_account_name: str = 'default',
+        spark_job_mode: str = 'cluster',
+        spark_job_python_version: str = '3',
+        spark_job_type: str = 'Python',
+        startup_timeout_seconds=600,
+        log_events_on_failure: bool = False,
+        in_cluster: Optional[bool] = None,
+        reattach_on_restart: bool = True,
+        delete_on_termination: bool = True,
+        kubernetes_conn_id: str = 'kubernetes_default',
         **kwargs,
     ) -> None:
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
         super().__init__(**kwargs)
         self.application_file = application_file
         self.namespace = namespace
         self.kubernetes_conn_id = kubernetes_conn_id
+        self.labels = labels or {}
+        self.env_from = env_from or []
+        self.env_vars = convert_env_vars(env_vars) if env_vars else []
+        self.affinity = convert_affinity(affinity) if affinity else k8s.V1Affinity()
+        self.tolerations = (
+            [convert_toleration(toleration) for toleration in tolerations] if tolerations else []
+        )
+        self.volume_mounts = [convert_volume_mount(v) for v in volume_mounts] if volume_mounts else []
+        self.volumes = [convert_volume(volume) for volume in volumes] if volumes else []
+        self.startup_timeout_seconds = startup_timeout_seconds
+        self.reattach_on_restart = reattach_on_restart
+        self.delete_on_termination = delete_on_termination
+        self.application_file = application_file
+        self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
+        self.do_xcom_push = do_xcom_push
+        self.name = PodGenerator.make_unique_pod_id(self.task_id)
+        if self.name:
+            self.name = self.name[:MAX_LABEL_LEN]
+        self.cluster_context = cluster_context
+        self.config_file = config_file
+        self.namespace = namespace
+        self.get_logs = get_logs
         self.api_group = api_group
         self.api_version = api_version
-        self.plural = "sparkapplications"
-
-    def execute(self, context: Context):
-        hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
-        self.log.info("Creating sparkApplication")
-        response = hook.create_custom_object(
-            group=self.api_group,
-            version=self.api_version,
-            plural=self.plural,
-            body=self.application_file,
+        self.api_kind = api_kind
+        self.api_plural = api_plural
+        self.code_path = code_path
+        self.dynamic_allocation = dynamic_allocation
+        self.dynamic_alloc_max_executors = dynamic_alloc_max_executors
+        self.dynamic_alloc_min_executors = dynamic_alloc_min_executors
+        self.dynamic_alloc_initial_executors = dynamic_alloc_initial_executors
+        if dynamic_allocation:
+            if not all(
+                [dynamic_alloc_max_executors, dynamic_alloc_min_executors, dynamic_alloc_initial_executors]
+            ):
+                raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed")
+        if config_map_mounts:
+            vols, vols_mounts = convert_configmap_to_volume(config_map_mounts)
+            self.volumes.extend(vols)
+            self.volume_mounts.extend(vols_mounts)
+        if from_env_config_map:
+            self.env_from.extend([convert_configmap(c) for c in from_env_config_map])
+        if from_env_secret:
+            self.env_from.extend([convert_secret(c) for c in from_env_secret])
+        self.log_events_on_failure = log_events_on_failure
+        self.in_cluster = in_cluster
+        self.image_pull_policy = image_pull_policy
+        self.service_account_name = service_account_name
+        self.image = image
+        self.spark_version = spark_version
+        self.spark_job_type = spark_job_type
+        self.spark_job_python_version = spark_job_python_version
+        self.spark_job_mode = spark_job_mode
+        self.success_run_history_limit = success_run_history_limit
+        self.number_workers = number_workers
+        self.spark_obj_spec = None
+        self.restart_policy = restart_policy or {'type': 'Never'}
+        self.hadoop_config = hadoop_config
+        self.job_resources = SparkResources(**resources) if resources else SparkResources()
+
+    def get_kube_clients(self):
+        if self.in_cluster is not None:
+            core_v1_api = kube_client.get_kube_client(
+                in_cluster=self.in_cluster,
+                cluster_context=self.cluster_context,
+                config_file=self.config_file,
+            )
+        else:
+            core_v1_api = kube_client.get_kube_client(
+                cluster_context=self.cluster_context, config_file=self.config_file
+            )
+        custom_obj_api = client.CustomObjectsApi()
+        return core_v1_api, custom_obj_api

Review Comment:
   do you have any example of how to use hook?



##########
airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -32,47 +58,411 @@ class SparkKubernetesOperator(BaseOperator):
 
     .. seealso::
         For more detail about Spark Application Object have a look at the reference:
-        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
+        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication
 
-    :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
-        path to a '.yaml' file, '.json' file, YAML string or JSON string.
+    :param application_file: filepath to kubernetes custom_resource_definition of sparkApplication
+    :param kubernetes_conn_id: the connection to Kubernetes cluster
+    :param image: Docker image you wish to launch. Defaults to hub.docker.com,
+    :param code_path: path to the code in your image,
     :param namespace: kubernetes namespace to put sparkApplication
-    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
-        for the to Kubernetes cluster.
-    :param api_group: kubernetes api group of sparkApplication
-    :param api_version: kubernetes api version of sparkApplication
+    :param api_group: CRD api group for spark

Review Comment:
   it would have been odd to have two operators that do the same thing but have different signatures. 
   I think we should handle both (creating job using yaml or passing parameters)in one operator



##########
airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -32,47 +58,411 @@ class SparkKubernetesOperator(BaseOperator):
 
     .. seealso::
         For more detail about Spark Application Object have a look at the reference:
-        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
+        https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication
 
-    :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
-        path to a '.yaml' file, '.json' file, YAML string or JSON string.
+    :param application_file: filepath to kubernetes custom_resource_definition of sparkApplication
+    :param kubernetes_conn_id: the connection to Kubernetes cluster
+    :param image: Docker image you wish to launch. Defaults to hub.docker.com,
+    :param code_path: path to the code in your image,
     :param namespace: kubernetes namespace to put sparkApplication
-    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
-        for the to Kubernetes cluster.
-    :param api_group: kubernetes api group of sparkApplication
-    :param api_version: kubernetes api version of sparkApplication
+    :param api_group: CRD api group for spark
+            https://github.com/GoogleCloudPlatform/spark-on-k8s-operator#project-status
+    :param api_version: CRD api version

Review Comment:
   I can add yaml file config based additionally and I like that as an option but not a fan of ditching the parameters from the Operator signature tbh. Internally we are heavily using the pythonic way of adding parameters by the users. It is much more simple in this way than an external yaml file to manage alongside your dag file. 
   If k8s cluster does not need a lot of different configuration users dont need to pass many parameters tbh. 
   
   a practical scenario is for example you want to define a few spark tasks in your dag, to debug any issue with anyone of the task you have to cross reference to find out the relevant yaml config etc which makes difficult from the readability point of view. 
   
   to address your point on k8s parameters vs operator related parameters maybe I can add a prefix to the name of the former so that those parameters become clear by name from other parameters. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org