You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/17 15:30:39 UTC

[GitHub] [airflow] dstandish commented on a diff in pull request #22253: Add SparkKubernetesOperator crd implementation

dstandish commented on code in PR #22253:
URL: https://github.com/apache/airflow/pull/22253#discussion_r997214511


##########
airflow/kubernetes/custom_object_launcher.py:
##########
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Launches Custom object"""
+import sys
+import time
+from copy import deepcopy
+from datetime import datetime as dt
+from typing import Optional
+
+import tenacity
+import yaml
+from kubernetes import client, watch
+from kubernetes.client import models as k8s
+from kubernetes.client.rest import ApiException
+
+from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+def should_retry_start_spark_job(exception: BaseException) -> bool:
+    """Check if an Exception indicates a transient error and warrants retrying"""
+    if isinstance(exception, ApiException):
+        return exception.status == 409
+    return False
+
+
+class SparkResources:
+    """spark resources
+    :param request_memory: requested memory
+    :param request_cpu: requested CPU number
+    :param request_ephemeral_storage: requested ephemeral storage
+    :param limit_memory: limit for memory usage
+    :param limit_cpu: Limit for CPU used
+    :param limit_gpu: Limits for GPU used
+    :param limit_ephemeral_storage: Limit for ephemeral storage
+    """
+
+    def __init__(
+        self,
+        **kwargs,
+    ):
+        self.driver_request_cpu = kwargs.get('driver_request_cpu')
+        self.driver_limit_cpu = kwargs.get('driver_limit_cpu')
+        self.driver_limit_memory = kwargs.get('driver_limit_memory')
+        self.executor_request_cpu = kwargs.get('executor_request_cpu')
+        self.executor_limit_cpu = kwargs.get('executor_limit_cpu')
+        self.executor_limit_memory = kwargs.get('executor_limit_memory')
+        self.driver_gpu_name = kwargs.get('driver_gpu_name')
+        self.driver_gpu_quantity = kwargs.get('driver_gpu_quantity')
+        self.executor_gpu_name = kwargs.get('executor_gpu_name')
+        self.executor_gpu_quantity = kwargs.get('executor_gpu_quantity')
+        self.convert_resources()
+
+    @property
+    def resources(self):
+        """Return job resources"""
+        return {'driver': self.driver_resources, 'executor': self.executor_resources}
+
+    @property
+    def driver_resources(self):
+        """Return resources to use"""
+        driver = {}
+        if self.driver_request_cpu:
+            driver['cores'] = self.driver_request_cpu
+        if self.driver_limit_cpu:
+            driver['coreLimit'] = self.driver_limit_cpu
+        if self.driver_limit_memory:
+            driver['memory'] = self.driver_limit_memory
+        if self.driver_gpu_name and self.driver_gpu_quantity:
+            driver['gpu'] = {'name': self.driver_gpu_name, 'quantity': self.driver_gpu_quantity}
+        return driver
+
+    @property
+    def executor_resources(self):
+        """Return resources to use"""
+        executor = {}
+        if self.executor_request_cpu:
+            executor['cores'] = self.executor_request_cpu
+        if self.executor_limit_cpu:
+            executor['coreLimit'] = self.executor_limit_cpu
+        if self.executor_limit_memory:
+            executor['memory'] = self.executor_limit_memory
+        if self.executor_gpu_name and self.executor_gpu_quantity:
+            executor['gpu'] = {'name': self.executor_gpu_name, 'quantity': self.executor_gpu_quantity}
+        return executor
+
+    def convert_resources(self):
+        if isinstance(self.driver_limit_memory, str):
+            if 'G' in self.driver_limit_memory or 'Gi' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.driver_limit_memory:
+                self.driver_limit_memory = float(self.driver_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.driver_limit_memory = str(int(self.driver_limit_memory / 1.4)) + 'm'
+
+        if isinstance(self.executor_limit_memory, str):
+            if 'G' in self.executor_limit_memory or 'Gi' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('Gi G')) * 1024
+            elif 'm' in self.executor_limit_memory:
+                self.executor_limit_memory = float(self.executor_limit_memory.rstrip('m'))
+            # Adjusting the memory value as operator adds 40% to the given value
+            self.executor_limit_memory = str(int(self.executor_limit_memory / 1.4)) + 'm'
+
+        if self.driver_request_cpu:
+            self.driver_request_cpu = int(float(self.driver_request_cpu))
+        if self.driver_limit_cpu:
+            self.driver_limit_cpu = str(self.driver_limit_cpu)
+        if self.executor_request_cpu:
+            self.executor_request_cpu = int(float(self.executor_request_cpu))
+        if self.executor_limit_cpu:
+            self.executor_limit_cpu = str(self.executor_limit_cpu)
+
+        if self.driver_gpu_quantity:
+            self.driver_gpu_quantity = int(float(self.driver_gpu_quantity))
+        if self.executor_gpu_quantity:
+            self.executor_gpu_quantity = int(float(self.executor_gpu_quantity))
+
+
+class CustomObjectStatus:
+    """Status of the PODs"""
+
+    SUBMITTED = 'SUBMITTED'
+    RUNNING = 'RUNNING'
+    FAILED = 'FAILED'
+    SUCCEEDED = 'SUCCEEDED'
+    INITIAL_SPEC = {
+        'metadata': {},
+        'spec': {
+            'dynamicAllocation': {'enabled': False},
+            'driver': {},
+            'executor': {},
+        },
+    }
+
+
+class CustomObjectLauncher(LoggingMixin):
+    """Launches PODS"""
+
+    def __init__(
+        self,
+        kube_client: client.CoreV1Api,
+        custom_obj_api: client.CustomObjectsApi,
+        namespace: str = 'default',
+        api_group: str = 'sparkoperator.k8s.io',
+        api_version: str = 'v1beta2',
+        plural: str = 'sparkapplications',
+        kind: str = 'SparkApplication',
+        extract_xcom: bool = False,
+        application_file: Optional[str] = None,
+    ):
+        """
+        Creates the launcher.
+
+        :param kube_client: kubernetes client
+        :param extract_xcom: whether we should extract xcom
+        """
+        super().__init__()
+        self.namespace = namespace
+        self.api_group = api_group
+        self.api_version = api_version
+        self.plural = plural
+        self.kind = kind
+        self._client = kube_client
+        self.custom_obj_api = custom_obj_api
+        self._watch = watch.Watch()
+        self.extract_xcom = extract_xcom
+        self.spark_obj_spec: dict = {}
+        self.pod_spec: dict = {}
+        self.body: dict = {}
+        self.application_file = application_file
+
+    @cached_property
+    def pod_manager(self) -> PodManager:
+        return PodManager(kube_client=self._client)
+
+    @staticmethod
+    def _load_body(file):
+        # try:
+        #     base_body = yaml.safe_load(file)
+        # except Exception:
+        try:
+            with open(file) as data:
+                base_body = yaml.safe_load(data)
+        except yaml.YAMLError as e:
+            raise AirflowException(f"Exception when loading resource definition: {e}\n")

Review Comment:
   what i mean is.... what's the point of converting this to "airflow exception"? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org