You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/10/21 07:30:30 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #6230: [AIRFLOW-5413] Allow K8S pod to be configured from JSON/YAML file

mik-laj commented on a change in pull request #6230: [AIRFLOW-5413] Allow K8S pod to be configured from JSON/YAML file
URL: https://github.com/apache/airflow/pull/6230#discussion_r336869991
 
 

 ##########
 File path: airflow/kubernetes/pod_generator.py
 ##########
 @@ -301,51 +352,151 @@ def reconcile_pods(base_pod: k8s.V1Pod, client_pod: k8s.V1Pod) -> k8s.V1Pod:
         :type client_pod: k8s.V1Pod
         :return: the merged pods
 
-        This can't be done recursively as certain fields are preserved,
-        some overwritten, and some concatenated, e.g. The command
-        should be preserved from base, the volumes appended to and
-        the other fields overwritten.
+        This can't be done recursively as certain fields some overwritten, and some concatenated.
         """
 
+        if client_pod is None:
+            return base_pod
+
         client_pod_cp = copy.deepcopy(client_pod)
 
         def merge_objects(base_obj, client_obj):
+            if not base_obj:
+                return client_obj
+            if not client_obj:
+                return base_obj
+
+            client_obj_cp = copy.deepcopy(client_obj)
+
             for base_key in base_obj.to_dict().keys():
                 base_val = getattr(base_obj, base_key, None)
                 if not getattr(client_obj, base_key, None) and base_val:
-                    setattr(client_obj, base_key, base_val)
+                    setattr(client_obj_cp, base_key, base_val)
+            return client_obj_cp
 
         def extend_object_field(base_obj, client_obj, field_name):
+            client_obj_cp = copy.deepcopy(client_obj)
             base_obj_field = getattr(base_obj, field_name, None)
             client_obj_field = getattr(client_obj, field_name, None)
             if not base_obj_field:
-                return
+                return client_obj_cp
             if not client_obj_field:
-                setattr(client_obj, field_name, base_obj_field)
-                return
+                setattr(client_obj_cp, field_name, base_obj_field)
+                return client_obj_cp
             appended_fields = base_obj_field + client_obj_field
-            setattr(client_obj, field_name, appended_fields)
-
-        # Values at the pod and metadata should be overwritten where they exist,
-        # but certain values at the spec and container level must be conserved.
-        base_container = base_pod.spec.containers[0]
-        client_container = client_pod_cp.spec.containers[0]
-
-        extend_object_field(base_container, client_container, 'volume_mounts')
-        extend_object_field(base_container, client_container, 'env')
-        extend_object_field(base_container, client_container, 'env_from')
-        extend_object_field(base_container, client_container, 'ports')
-        extend_object_field(base_container, client_container, 'volume_devices')
-        client_container.command = base_container.command
-        client_container.args = base_container.args
-        merge_objects(base_pod.spec.containers[0], client_pod_cp.spec.containers[0])
-        # Just append any additional containers from the base pod
-        client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
-
-        merge_objects(base_pod.metadata, client_pod_cp.metadata)
-
-        extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
-        merge_objects(base_pod.spec, client_pod_cp.spec)
-        merge_objects(base_pod, client_pod_cp)
+            setattr(client_obj_cp, field_name, appended_fields)
+            return client_obj_cp
+
+        if base_pod.spec and not client_pod.spec:
+            client_pod_cp.spec = base_pod.spec
+        elif client_pod_cp.spec and base_pod.spec:
+            client_container = client_pod_cp.spec.containers[0]
+            base_container = base_pod.spec.containers[0]
+            cc1 = extend_object_field(base_container, client_container, 'volume_mounts')
+            cc2 = extend_object_field(base_container, cc1, 'env')
+            cc3 = extend_object_field(base_container, cc2, 'env_from')
+            cc4 = extend_object_field(base_container, cc3, 'ports')
+            cc5 = extend_object_field(base_container, cc4, 'volume_devices')
+
+            cc6 = merge_objects(base_container, cc5)
+            client_pod_cp.spec.containers[0] = cc6
+            # Just append any additional containers from the base pod
+            client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
+            merged_spec = extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
+            client_pod_cp.spec = merge_objects(base_pod.spec, merged_spec)
+
+        client_pod_cp.metadata = merge_objects(base_pod.metadata, client_pod_cp.metadata)
+        client_pod_cp = merge_objects(base_pod, client_pod_cp)
 
         return client_pod_cp
+
+    @staticmethod
+    def construct_pod(
+        dag_id: str,
+        task_id: str,
+        pod_id: str,
+        try_number: int,
+        date: str,
+        command: List[str],
+        kube_executor_config: Optional[k8s.V1Pod],
+        worker_config: k8s.V1Pod,
+        namespace: str,
+        worker_uuid: str
+    ) -> k8s.V1Pod:
+        """
+        Construct a pod by gathering and consolidating the configuration from 3 places:
+            - airflow.cfg
+            - executor_config
+            - dynamic arguments
+        """
+
+        dynamic_pod = PodGenerator(
+            namespace=namespace,
+            labels={
+                'airflow-worker': worker_uuid,
+                'dag_id': dag_id,
+                'task_id': task_id,
+                'execution_date': date,
+                'try_number': str(try_number),
+            },
+            cmds=command,
+            name=pod_id
+        ).gen_pod()
+
+        # Reconcile the pod generated by the Operator and the Pod
+        # generated by the .cfg file
+        pod_with_executor_config = PodGenerator.reconcile_pods(worker_config,
+                                                               kube_executor_config)
+        # Reconcile that pod with the dynamic fields.
+        return PodGenerator.reconcile_pods(pod_with_executor_config, dynamic_pod)
+
+    @staticmethod
+    def deserialize_model_file(api_client: ApiClient, path: str) -> k8s.V1Pod:
+        """
+        :param api_client: K8S client object
+        :param path: Path to the file
+        :return: a kubernetes.client.models.V1Pod
+        """
+        pod = None
+        with open(path) as stream:
+            if '.json' in path:
+                pod = json.load(stream)
+            elif '.yaml' in path:
+                pod = yaml.safe_load(stream)
+            elif not pod:
+                raise AirflowConfigException("Path was neither .json nor .yaml")
+
+            # pylint: disable=protected-access
+            return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
+
+    @staticmethod
+    def deserialize_model_string(api_client: ApiClient, string: str) -> k8s.V1Pod:
+        """
+        :param api_client: K8S client object
+        :param string: a string of the deployment
+        :return: a kubernetes.client.models.V1Pod
+        """
+        try:
+            pod = json.loads(string)
+        except json.decoder.JSONDecodeError:
+            try:
+                pod = yaml.safe_load(string)
+            except ScannerError:
+                raise AirflowConfigException(
+                    "Could not parse {} as yaml or json".format(string)
+                )
+
+        # pylint: disable=protected-access
+        return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
 
 Review comment:
   This is a private method. Rather, we should not rely on the existence of this method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services