You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/05/29 21:16:24 UTC
[airflow] 01/01: [AIRFLOW-4851] Refactor K8S codebase with k8s API
models (#5481)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d74235912335bf77b026f3db3572aa2fe2798b28
Author: davlum <da...@gmail.com>
AuthorDate: Fri May 29 14:14:41 2020 -0700
[AIRFLOW-4851] Refactor K8S codebase with k8s API models (#5481)
* [AIRLFOW-4851] refactor Airflow kubernetes
* [AIRFLOW-4851] refactor Airflow k8s models
* [AIRFLOW-4851] Fix linting and tests
* Refactor and add some tests
* [AIRLFOW-4851] Add assertions to PodOperator tests
Author: davlum <da...@gmail.com>
Date: Wed Sep 4 17:24:31 2019 -0400
(cherry picked from commit 17d4179db2d4424b2bbd4a7f9546a0ca3628cac2)
---
.../contrib/operators/kubernetes_pod_operator.py | 185 ++---
airflow/executors/kubernetes_executor.py | 102 +--
airflow/kubernetes/k8s_model.py | 55 ++
.../kubernetes_request_factory/__init__.py | 16 -
.../kubernetes_request_factory.py | 258 ------
.../pod_request_factory.py | 139 ----
airflow/kubernetes/pod.py | 124 +--
airflow/kubernetes/pod_generator.py | 470 +++++++----
airflow/kubernetes/pod_launcher.py | 58 +-
airflow/kubernetes/pod_runtime_info_env.py | 28 +-
airflow/kubernetes/secret.py | 63 +-
airflow/kubernetes/volume.py | 20 +-
airflow/kubernetes/volume_mount.py | 24 +-
airflow/kubernetes/worker_configuration.py | 496 ++++++------
airflow/settings.py | 5 +-
tests/executors/test_kubernetes_executor.py | 882 +--------------------
.../test_kubernetes_request_factory.py | 396 ---------
.../test_pod_request_factory.py | 175 ----
.../__init__.py | 0
tests/kubernetes/models/test_pod.py | 76 ++
tests/kubernetes/models/test_secret.py | 115 +++
tests/kubernetes/test_pod_generator.py | 328 ++++++++
tests/kubernetes/test_pod_launcher.py | 18 +-
tests/kubernetes/test_worker_configuration.py | 619 +++++++++++++++
.../kubernetes/test_kubernetes_pod_operator.py | 117 ++-
tests/test_local_settings.py | 4 +-
26 files changed, 2164 insertions(+), 2609 deletions(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index d121510..f35ade4 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -15,17 +15,14 @@
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""
-import re
import warnings
from airflow.exceptions import AirflowException
+from airflow.kubernetes import pod_generator, kube_client, pod_launcher
+from airflow.kubernetes.k8s_model import append_to_pod
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
-from airflow.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.kubernetes.pod import Resources
-from airflow.utils.helpers import validate_key
from airflow.utils.state import State
-from airflow.version import version as airflow_version
class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes
@@ -49,13 +46,13 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
- :param ports: ports for launched pod.
- :type ports: list[airflow.kubernetes.pod.Port]
- :param volume_mounts: volumeMounts for launched pod.
- :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
- :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
- :type volumes: list[airflow.kubernetes.volume.Volume]
- :param labels: labels to apply to the Pod.
+ :param ports: ports for launched pod
+ :type ports: list[airflow.kubernetes.models.port.Port]
+ :param volume_mounts: volumeMounts for launched pod
+ :type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
+ :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
+ :type volumes: list[airflow.kubernetes.models.volume.Volume]
+ :param labels: labels to apply to the Pod
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
@@ -64,10 +61,10 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type name: str
:param env_vars: Environment variables initialized in the container. (templated)
:type env_vars: dict
- :param secrets: Kubernetes secrets to inject in the container.
- They can be exposed as environment vars or files in a volume
- :type secrets: list[airflow.kubernetes.secret.Secret]
- :param in_cluster: run kubernetes client with in_cluster configuration.
+ :param secrets: Kubernetes secrets to inject in the container,
+ They can be exposed as environment vars or files in a volume.
+ :type secrets: list[airflow.kubernetes.models.secret.Secret]
+ :param in_cluster: run kubernetes client with in_cluster configuration
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
@@ -105,15 +102,74 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
want mount as env variables.
:type configmaps: list[str]
:param pod_runtime_info_envs: environment variables about
- pod runtime information (ip, namespace, nodeName, podName).
- :type pod_runtime_info_envs: list[PodRuntimeEnv]
- :param security_context: security options the pod should run with (PodSecurityContext).
- :type security_context: dict
- :param dnspolicy: dnspolicy for the pod.
+ pod runtime information (ip, namespace, nodeName, podName)
+ :type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
+ :param dnspolicy: Specify a dnspolicy for the pod
:type dnspolicy: str
+ :param full_pod_spec: The complete podSpec
+ :type full_pod_spec: kubernetes.client.models.V1Pod
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
+ def execute(self, context):
+ try:
+ client = kube_client.get_kube_client(in_cluster=self.in_cluster,
+ cluster_context=self.cluster_context,
+ config_file=self.config_file)
+
+ pod = pod_generator.PodGenerator(
+ image=self.image,
+ namespace=self.namespace,
+ cmds=self.cmds,
+ args=self.arguments,
+ labels=self.labels,
+ name=self.name,
+ envs=self.env_vars,
+ extract_xcom=self.do_xcom_push,
+ image_pull_policy=self.image_pull_policy,
+ node_selectors=self.node_selectors,
+ annotations=self.annotations,
+ affinity=self.affinity,
+ image_pull_secrets=self.image_pull_secrets,
+ service_account_name=self.service_account_name,
+ hostnetwork=self.hostnetwork,
+ tolerations=self.tolerations,
+ configmaps=self.configmaps,
+ security_context=self.security_context,
+ dnspolicy=self.dnspolicy,
+ resources=self.resources,
+ pod=self.full_pod_spec,
+ ).gen_pod()
+
+ pod = append_to_pod(pod, self.ports)
+ pod = append_to_pod(pod, self.pod_runtime_info_envs)
+ pod = append_to_pod(pod, self.volumes)
+ pod = append_to_pod(pod, self.volume_mounts)
+ pod = append_to_pod(pod, self.secrets)
+
+ self.pod = pod
+
+ launcher = pod_launcher.PodLauncher(kube_client=client,
+ extract_xcom=self.do_xcom_push)
+
+ try:
+ (final_state, result) = launcher.run_pod(
+ pod,
+ startup_timeout=self.startup_timeout_seconds,
+ get_logs=self.get_logs)
+ finally:
+ if self.is_delete_operator_pod:
+ launcher.delete_pod(pod)
+
+ if final_state != State.SUCCESS:
+ raise AirflowException(
+ 'Pod returned a failure: {state}'.format(state=final_state)
+ )
+
+ return result
+ except AirflowException as ex:
+ raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
+
@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
@@ -136,6 +192,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
resources=None,
affinity=None,
config_file=None,
+ do_xcom_push=False,
node_selectors=None,
image_pull_secrets=None,
service_account_name='default',
@@ -146,7 +203,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
security_context=None,
pod_runtime_info_envs=None,
dnspolicy=None,
- do_xcom_push=False,
+ full_pod_spec=None,
*args,
**kwargs):
# https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
@@ -157,6 +214,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
DeprecationWarning, stacklevel=2
)
super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs)
+ self.pod = None
self.do_xcom_push = do_xcom_push
self.image = image
self.namespace = namespace
@@ -177,7 +235,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
self.node_selectors = node_selectors or {}
self.annotations = annotations or {}
self.affinity = affinity or {}
- self.resources = self._set_resources(resources)
+ self.resources = resources
self.config_file = config_file
self.image_pull_secrets = image_pull_secrets
self.service_account_name = service_account_name
@@ -188,83 +246,4 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
self.security_context = security_context or {}
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
-
- def execute(self, context):
- try:
- if self.in_cluster is not None:
- client = kube_client.get_kube_client(in_cluster=self.in_cluster,
- cluster_context=self.cluster_context,
- config_file=self.config_file)
- else:
- client = kube_client.get_kube_client(cluster_context=self.cluster_context,
- config_file=self.config_file)
-
- # Add Airflow Version to the label
- # And a label to identify that pod is launched by KubernetesPodOperator
- self.labels.update(
- {
- 'airflow_version': airflow_version.replace('+', '-'),
- 'kubernetes_pod_operator': 'True',
- }
- )
-
- gen = pod_generator.PodGenerator()
-
- for port in self.ports:
- gen.add_port(port)
- for mount in self.volume_mounts:
- gen.add_mount(mount)
- for volume in self.volumes:
- gen.add_volume(volume)
-
- pod = gen.make_pod(
- namespace=self.namespace,
- image=self.image,
- pod_id=self.name,
- cmds=self.cmds,
- arguments=self.arguments,
- labels=self.labels,
- )
-
- pod.service_account_name = self.service_account_name
- pod.secrets = self.secrets
- pod.envs = self.env_vars
- pod.image_pull_policy = self.image_pull_policy
- pod.image_pull_secrets = self.image_pull_secrets
- pod.annotations = self.annotations
- pod.resources = self.resources
- pod.affinity = self.affinity
- pod.node_selectors = self.node_selectors
- pod.hostnetwork = self.hostnetwork
- pod.tolerations = self.tolerations
- pod.configmaps = self.configmaps
- pod.security_context = self.security_context
- pod.pod_runtime_info_envs = self.pod_runtime_info_envs
- pod.dnspolicy = self.dnspolicy
-
- launcher = pod_launcher.PodLauncher(kube_client=client,
- extract_xcom=self.do_xcom_push)
- try:
- (final_state, result) = launcher.run_pod(
- pod,
- startup_timeout=self.startup_timeout_seconds,
- get_logs=self.get_logs)
- finally:
- if self.is_delete_operator_pod:
- launcher.delete_pod(pod)
-
- if final_state != State.SUCCESS:
- raise AirflowException(
- 'Pod returned a failure: {state}'.format(state=final_state)
- )
- if self.do_xcom_push:
- return result
- except AirflowException as ex:
- raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
-
- def _set_resources(self, resources):
- return Resources(**resources) if resources else Resources()
-
- def _set_name(self, name):
- validate_key(name, max_length=63)
- return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
+ self.full_pod_spec = full_pod_spec
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 32f4fd5..6944274 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -36,8 +36,8 @@ from airflow.configuration import conf
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.worker_configuration import WorkerConfiguration
+from airflow.kubernetes.pod_generator import PodGenerator
from airflow.executors.base_executor import BaseExecutor
-from airflow.executors import Executors
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
from airflow.utils.state import State
from airflow.utils.db import provide_session, create_session
@@ -49,87 +49,6 @@ MAX_POD_ID_LEN = 253
MAX_LABEL_LEN = 63
-class KubernetesExecutorConfig:
- def __init__(self, image=None, image_pull_policy=None, request_memory=None,
- request_cpu=None, limit_memory=None, limit_cpu=None, limit_gpu=None,
- gcp_service_account_key=None, node_selectors=None, affinity=None,
- annotations=None, volumes=None, volume_mounts=None, tolerations=None, labels=None):
- self.image = image
- self.image_pull_policy = image_pull_policy
- self.request_memory = request_memory
- self.request_cpu = request_cpu
- self.limit_memory = limit_memory
- self.limit_cpu = limit_cpu
- self.limit_gpu = limit_gpu
- self.gcp_service_account_key = gcp_service_account_key
- self.node_selectors = node_selectors
- self.affinity = affinity
- self.annotations = annotations
- self.volumes = volumes
- self.volume_mounts = volume_mounts
- self.tolerations = tolerations
- self.labels = labels or {}
-
- def __repr__(self):
- return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
- "limit_memory={}, limit_cpu={}, limit_gpu={}, gcp_service_account_key={}, " \
- "node_selectors={}, affinity={}, annotations={}, volumes={}, " \
- "volume_mounts={}, tolerations={}, labels={})" \
- .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
- self.request_memory, self.request_cpu, self.limit_memory,
- self.limit_cpu, self.limit_gpu, self.gcp_service_account_key, self.node_selectors,
- self.affinity, self.annotations, self.volumes, self.volume_mounts,
- self.tolerations, self.labels)
-
- @staticmethod
- def from_dict(obj):
- if obj is None:
- return KubernetesExecutorConfig()
-
- if not isinstance(obj, dict):
- raise TypeError(
- 'Cannot convert a non-dictionary object into a KubernetesExecutorConfig')
-
- namespaced = obj.get(Executors.KubernetesExecutor, {})
-
- return KubernetesExecutorConfig(
- image=namespaced.get('image', None),
- image_pull_policy=namespaced.get('image_pull_policy', None),
- request_memory=namespaced.get('request_memory', None),
- request_cpu=namespaced.get('request_cpu', None),
- limit_memory=namespaced.get('limit_memory', None),
- limit_cpu=namespaced.get('limit_cpu', None),
- limit_gpu=namespaced.get('limit_gpu', None),
- gcp_service_account_key=namespaced.get('gcp_service_account_key', None),
- node_selectors=namespaced.get('node_selectors', None),
- affinity=namespaced.get('affinity', None),
- annotations=namespaced.get('annotations', {}),
- volumes=namespaced.get('volumes', []),
- volume_mounts=namespaced.get('volume_mounts', []),
- tolerations=namespaced.get('tolerations', None),
- labels=namespaced.get('labels', {}),
- )
-
- def as_dict(self):
- return {
- 'image': self.image,
- 'image_pull_policy': self.image_pull_policy,
- 'request_memory': self.request_memory,
- 'request_cpu': self.request_cpu,
- 'limit_memory': self.limit_memory,
- 'limit_cpu': self.limit_cpu,
- 'limit_gpu': self.limit_gpu,
- 'gcp_service_account_key': self.gcp_service_account_key,
- 'node_selectors': self.node_selectors,
- 'affinity': self.affinity,
- 'annotations': self.annotations,
- 'volumes': self.volumes,
- 'volume_mounts': self.volume_mounts,
- 'tolerations': self.tolerations,
- 'labels': self.labels,
- }
-
-
class KubeConfig:
"""Configuration for Kubernetes"""
core_section = 'core'
@@ -473,17 +392,23 @@ class AirflowKubernetesScheduler(LoggingMixin):
self.log.info('Kubernetes job is %s', str(next_job))
key, command, kube_executor_config = next_job
dag_id, task_id, execution_date, try_number = key
- self.log.debug("Kubernetes running for command %s", command)
- self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image)
- pod = self.worker_configuration.make_pod(
- namespace=self.namespace, worker_uuid=self.worker_uuid,
+
+ config_pod = self.worker_configuration.make_pod(
+ namespace=self.namespace,
+ worker_uuid=self.worker_uuid,
pod_id=self._create_pod_id(dag_id, task_id),
dag_id=self._make_safe_label_value(dag_id),
task_id=self._make_safe_label_value(task_id),
try_number=try_number,
execution_date=self._datetime_to_label_safe_datestring(execution_date),
- airflow_command=command, kube_executor_config=kube_executor_config
+ airflow_command=command
)
+ # Reconcile the pod generated by the Operator and the Pod
+ # generated by the .cfg file
+ pod = PodGenerator.reconcile_pods(config_pod, kube_executor_config)
+ self.log.debug("Kubernetes running for command %s", command)
+ self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)
+
# the watcher will monitor pods, so we do not block.
self.launcher.run_pod_async(pod, **self.kube_config.kube_client_request_args)
self.log.debug("Kubernetes Job created!")
@@ -841,7 +766,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
'Add task %s with command %s with executor_config %s',
key, command, executor_config
)
- kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config)
+
+ kube_executor_config = PodGenerator.from_obj(executor_config)
self.task_queue.put((key, command, kube_executor_config))
def sync(self):
diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
new file mode 100644
index 0000000..5f27bf7
--- /dev/null
+++ b/airflow/kubernetes/k8s_model.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+
+from abc import ABC, abstractmethod
+from functools import reduce
+
+
+class K8SModel(ABC):
+ """
+ These Airflow Kubernetes models are here for backwards compatibility
+ reasons only. Ideally clients should use the kubernetes api
+ and the process of
+
+ client input -> Airflow k8s models -> k8s models
+
+ can be avoided. All of these models implement the
+ `attach_to_pod` method so that they integrate with the kubernetes client.
+ """
+ @abstractmethod
+ def attach_to_pod(self, pod):
+ """
+ :param pod: A pod to attach this Kubernetes object to
+ :type pod: kubernetes.client.models.V1Pod
+ :return: The pod with the object attached
+ """
+
+
+def append_to_pod(pod, k8s_objects):
+ """
+ :param pod: A pod to attach a list of Kubernetes objects to
+ :type pod: kubernetes.client.models.V1Pod
+ :param k8s_objects: a potential None list of K8SModels
+ :type k8s_objects: Optional[List[K8SModel]]
+ :return: pod with the objects attached if they exist
+ """
+ if not k8s_objects:
+ return pod
+ return reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
diff --git a/airflow/kubernetes/kubernetes_request_factory/__init__.py b/airflow/kubernetes/kubernetes_request_factory/__init__.py
deleted file mode 100644
index 13a8339..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
deleted file mode 100644
index ad58510..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ /dev/null
@@ -1,258 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from abc import ABCMeta, abstractmethod
-import six
-
-
-class KubernetesRequestFactory:
- """
- Create requests to be sent to kube API.
- Extend this class to talk to kubernetes and generate your specific resources.
- This is equivalent of generating yaml files that can be used by `kubectl`
- """
- __metaclass__ = ABCMeta
-
- @abstractmethod
- def create(self, pod):
- """
- Creates the request for kubernetes API.
-
- :param pod: The pod object
- """
-
- @staticmethod
- def extract_image(pod, req):
- req['spec']['containers'][0]['image'] = pod.image
-
- @staticmethod
- def extract_image_pull_policy(pod, req):
- if pod.image_pull_policy:
- req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
-
- @staticmethod
- def add_secret_to_env(env, secret):
- env.append({
- 'name': secret.deploy_target,
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': secret.secret,
- 'key': secret.key
- }
- }
- })
-
- @staticmethod
- def add_runtime_info_env(env, runtime_info):
- env.append({
- 'name': runtime_info.name,
- 'valueFrom': {
- 'fieldRef': {
- 'fieldPath': runtime_info.field_path
- }
- }
- })
-
- @staticmethod
- def extract_labels(pod, req):
- req['metadata']['labels'] = req['metadata'].get('labels', {})
- for k, v in six.iteritems(pod.labels):
- req['metadata']['labels'][k] = v
-
- @staticmethod
- def extract_annotations(pod, req):
- req['metadata']['annotations'] = req['metadata'].get('annotations', {})
- for k, v in six.iteritems(pod.annotations):
- req['metadata']['annotations'][k] = v
-
- @staticmethod
- def extract_affinity(pod, req):
- req['spec']['affinity'] = req['spec'].get('affinity', {})
- for k, v in six.iteritems(pod.affinity):
- req['spec']['affinity'][k] = v
-
- @staticmethod
- def extract_node_selector(pod, req):
- req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {})
- for k, v in six.iteritems(pod.node_selectors):
- req['spec']['nodeSelector'][k] = v
-
- @staticmethod
- def extract_cmds(pod, req):
- req['spec']['containers'][0]['command'] = pod.cmds
-
- @staticmethod
- def extract_args(pod, req):
- req['spec']['containers'][0]['args'] = pod.args
-
- @staticmethod
- def attach_ports(pod, req):
- req['spec']['containers'][0]['ports'] = (
- req['spec']['containers'][0].get('ports', []))
- if len(pod.ports) > 0:
- req['spec']['containers'][0]['ports'].extend(pod.ports)
-
- @staticmethod
- def attach_volumes(pod, req):
- req['spec']['volumes'] = (
- req['spec'].get('volumes', []))
- if len(pod.volumes) > 0:
- req['spec']['volumes'].extend(pod.volumes)
-
- @staticmethod
- def attach_volume_mounts(pod, req):
- if len(pod.volume_mounts) > 0:
- req['spec']['containers'][0]['volumeMounts'] = (
- req['spec']['containers'][0].get('volumeMounts', []))
- req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts)
-
- @staticmethod
- def extract_name(pod, req):
- req['metadata']['name'] = pod.name
-
- @staticmethod
- def extract_volume_secrets(pod, req):
- vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
- if any(vol_secrets):
- req['spec']['containers'][0]['volumeMounts'] = (
- req['spec']['containers'][0].get('volumeMounts', []))
- req['spec']['volumes'] = (
- req['spec'].get('volumes', []))
- for idx, vol in enumerate(vol_secrets):
- vol_id = 'secretvol' + str(idx)
- req['spec']['containers'][0]['volumeMounts'].append({
- 'mountPath': vol.deploy_target,
- 'name': vol_id,
- 'readOnly': True
- })
- req['spec']['volumes'].append({
- 'name': vol_id,
- 'secret': {
- 'secretName': vol.secret
- }
- })
-
- @staticmethod
- def extract_env_and_secrets(pod, req):
- envs_from_key_secrets = [
- env for env in pod.secrets if env.deploy_type == 'env' and env.key is not None
- ]
-
- if len(pod.envs) > 0 or len(envs_from_key_secrets) > 0 or len(pod.pod_runtime_info_envs) > 0:
- env = []
- for runtime_info in pod.pod_runtime_info_envs:
- KubernetesRequestFactory.add_runtime_info_env(env, runtime_info)
- for k in pod.envs.keys():
- env.append({'name': k, 'value': pod.envs[k]})
- for secret in envs_from_key_secrets:
- KubernetesRequestFactory.add_secret_to_env(env, secret)
-
- req['spec']['containers'][0]['env'] = env
-
- KubernetesRequestFactory._apply_env_from(pod, req)
-
- @staticmethod
- def extract_resources(pod, req):
- if not pod.resources or pod.resources.is_empty_resource_request():
- return
-
- req['spec']['containers'][0]['resources'] = {}
-
- if pod.resources.has_requests():
- req['spec']['containers'][0]['resources']['requests'] = {}
- if pod.resources.request_memory:
- req['spec']['containers'][0]['resources']['requests'][
- 'memory'] = pod.resources.request_memory
- if pod.resources.request_cpu:
- req['spec']['containers'][0]['resources']['requests'][
- 'cpu'] = pod.resources.request_cpu
-
- if pod.resources.has_limits():
- req['spec']['containers'][0]['resources']['limits'] = {}
- if pod.resources.limit_memory:
- req['spec']['containers'][0]['resources']['limits'][
- 'memory'] = pod.resources.limit_memory
- if pod.resources.limit_cpu:
- req['spec']['containers'][0]['resources']['limits'][
- 'cpu'] = pod.resources.limit_cpu
- if pod.resources.limit_gpu:
- req['spec']['containers'][0]['resources']['limits'][
- 'nvidia.com/gpu'] = pod.resources.limit_gpu
-
- @staticmethod
- def extract_init_containers(pod, req):
- if pod.init_containers:
- req['spec']['initContainers'] = pod.init_containers
-
- @staticmethod
- def extract_service_account_name(pod, req):
- if pod.service_account_name:
- req['spec']['serviceAccountName'] = pod.service_account_name
-
- @staticmethod
- def extract_hostnetwork(pod, req):
- if pod.hostnetwork:
- req['spec']['hostNetwork'] = pod.hostnetwork
-
- @staticmethod
- def extract_dnspolicy(pod, req):
- if pod.dnspolicy:
- req['spec']['dnsPolicy'] = pod.dnspolicy
-
- @staticmethod
- def extract_image_pull_secrets(pod, req):
- if pod.image_pull_secrets:
- req['spec']['imagePullSecrets'] = [{
- 'name': pull_secret
- } for pull_secret in pod.image_pull_secrets.split(',')]
-
- @staticmethod
- def extract_tolerations(pod, req):
- if pod.tolerations:
- req['spec']['tolerations'] = pod.tolerations
-
- @staticmethod
- def extract_security_context(pod, req):
- if pod.security_context:
- req['spec']['securityContext'] = pod.security_context
-
- @staticmethod
- def _apply_env_from(pod, req):
- envs_from_secrets = [
- env for env in pod.secrets if env.deploy_type == 'env' and env.key is None
- ]
-
- if pod.configmaps or envs_from_secrets:
- req['spec']['containers'][0]['envFrom'] = []
-
- for secret in envs_from_secrets:
- req['spec']['containers'][0]['envFrom'].append(
- {
- 'secretRef': {
- 'name': secret.secret
- }
- }
- )
-
- for configmap in pod.configmaps:
- req['spec']['containers'][0]['envFrom'].append(
- {
- 'configMapRef': {
- 'name': configmap
- }
- }
- )
diff --git a/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
deleted file mode 100644
index 3790948..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ /dev/null
@@ -1,139 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import yaml
-from airflow.kubernetes.pod import Pod
-from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
- import KubernetesRequestFactory
-
-
-class SimplePodRequestFactory(KubernetesRequestFactory):
- """
- Request generator for a pod.
-
- """
- _yaml = """apiVersion: v1
-kind: Pod
-metadata:
- name: name
-spec:
- containers:
- - name: base
- image: airflow-worker:latest
- command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
- restartPolicy: Never
- """
-
- def __init__(self):
-
- pass
-
- def create(self, pod):
- # type: (Pod) -> dict
- req = yaml.safe_load(self._yaml)
- self.extract_name(pod, req)
- self.extract_labels(pod, req)
- self.extract_image(pod, req)
- self.extract_image_pull_policy(pod, req)
- self.extract_cmds(pod, req)
- self.extract_args(pod, req)
- self.extract_node_selector(pod, req)
- self.extract_env_and_secrets(pod, req)
- self.extract_volume_secrets(pod, req)
- self.attach_ports(pod, req)
- self.attach_volumes(pod, req)
- self.attach_volume_mounts(pod, req)
- self.extract_resources(pod, req)
- self.extract_service_account_name(pod, req)
- self.extract_init_containers(pod, req)
- self.extract_image_pull_secrets(pod, req)
- self.extract_annotations(pod, req)
- self.extract_affinity(pod, req)
- self.extract_hostnetwork(pod, req)
- self.extract_tolerations(pod, req)
- self.extract_security_context(pod, req)
- self.extract_dnspolicy(pod, req)
- return req
-
-
-class ExtractXcomPodRequestFactory(KubernetesRequestFactory):
- """
- Request generator for a pod with sidecar container.
-
- """
-
- XCOM_MOUNT_PATH = '/airflow/xcom'
- SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
- _yaml = """apiVersion: v1
-kind: Pod
-metadata:
- name: name
-spec:
- volumes:
- - name: xcom
- emptyDir: {{}}
- containers:
- - name: base
- image: airflow-worker:latest
- command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
- volumeMounts:
- - name: xcom
- mountPath: {xcomMountPath}
- - name: {sidecarContainerName}
- image: alpine
- command:
- - sh
- - -c
- - 'trap "exit 0" INT; while true; do sleep 30; done;'
- volumeMounts:
- - name: xcom
- mountPath: {xcomMountPath}
- resources:
- requests:
- cpu: 1m
- restartPolicy: Never
- """.format(xcomMountPath=XCOM_MOUNT_PATH, sidecarContainerName=SIDECAR_CONTAINER_NAME)
-
- def __init__(self):
- pass
-
- def create(self, pod):
- # type: (Pod) -> dict
- req = yaml.safe_load(self._yaml)
- self.extract_name(pod, req)
- self.extract_labels(pod, req)
- self.extract_image(pod, req)
- self.extract_image_pull_policy(pod, req)
- self.extract_cmds(pod, req)
- self.extract_args(pod, req)
- self.extract_node_selector(pod, req)
- self.extract_env_and_secrets(pod, req)
- self.extract_volume_secrets(pod, req)
- self.attach_ports(pod, req)
- self.attach_volumes(pod, req)
- self.attach_volume_mounts(pod, req)
- self.extract_resources(pod, req)
- self.extract_service_account_name(pod, req)
- self.extract_init_containers(pod, req)
- self.extract_image_pull_secrets(pod, req)
- self.extract_annotations(pod, req)
- self.extract_affinity(pod, req)
- self.extract_hostnetwork(pod, req)
- self.extract_tolerations(pod, req)
- self.extract_security_context(pod, req)
- self.extract_dnspolicy(pod, req)
- return req
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index a9f3dfe..53c9172 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -14,9 +14,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
-class Resources:
+
+class Resources(K8SModel):
__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')
def __init__(
@@ -41,13 +48,20 @@ class Resources:
def has_requests(self):
return self.request_cpu is not None or self.request_memory is not None
- def __str__(self):
- return "Request: [cpu: {}, memory: {}], Limit: [cpu: {}, memory: {}, gpu: {}]".format(
- self.request_cpu, self.request_memory, self.limit_cpu, self.limit_memory, self.limit_gpu
+ def to_k8s_client_obj(self):
+ return k8s.V1ResourceRequirements(
+ limits={'cpu': self.limit_cpu, 'memory': self.limit_memory, 'nvidia.com/gpu': self.limit_gpu},
+ requests={'cpu': self.request_cpu, 'memory': self.request_memory}
)
+ def attach_to_pod(self, pod):
+ cp_pod = copy.deepcopy(pod)
+ resources = self.to_k8s_client_obj()
+ cp_pod.spec.containers[0].resources = resources
+ return cp_pod
+
-class Port:
+class Port(K8SModel):
def __init__(
self,
name=None,
@@ -55,96 +69,12 @@ class Port:
self.name = name
self.container_port = container_port
+ def to_k8s_client_obj(self):
+ return k8s.V1ContainerPort(name=self.name, container_port=self.container_port)
-class Pod:
- """
- Represents a kubernetes pod and manages execution of a single pod.
- :param image: The docker image
- :type image: str
- :param envs: A dict containing the environment variables
- :type envs: dict
- :param cmds: The command to be run on the pod
- :type cmds: list[str]
- :param secrets: Secrets to be launched to the pod
- :type secrets: list[airflow.kubernetes.secret.Secret]
- :param result: The result that will be returned to the operator after
- successful execution of the pod
- :type result: any
- :param image_pull_policy: Specify a policy to cache or always pull an image
- :type image_pull_policy: str
- :param image_pull_secrets: Any image pull secrets to be given to the pod.
- If more than one secret is required, provide a comma separated list:
- secret_a,secret_b
- :type image_pull_secrets: str
- :param affinity: A dict containing a group of affinity scheduling rules
- :type affinity: dict
- :param hostnetwork: If True enable host networking on the pod
- :type hostnetwork: bool
- :param tolerations: A list of kubernetes tolerations
- :type tolerations: list
- :param security_context: A dict containing the security context for the pod
- :type security_context: dict
- :param configmaps: A list containing names of configmaps object
- mounting env variables to the pod
- :type configmaps: list[str]
- :param pod_runtime_info_envs: environment variables about
- pod runtime information (ip, namespace, nodeName, podName)
- :type pod_runtime_info_envs: list[PodRuntimeEnv]
- :param dnspolicy: Specify a dnspolicy for the pod
- :type dnspolicy: str
- """
- def __init__(
- self,
- image,
- envs,
- cmds,
- args=None,
- secrets=None,
- labels=None,
- node_selectors=None,
- name=None,
- ports=None,
- volumes=None,
- volume_mounts=None,
- namespace='default',
- result=None,
- image_pull_policy='IfNotPresent',
- image_pull_secrets=None,
- init_containers=None,
- service_account_name=None,
- resources=None,
- annotations=None,
- affinity=None,
- hostnetwork=False,
- tolerations=None,
- security_context=None,
- configmaps=None,
- pod_runtime_info_envs=None,
- dnspolicy=None
- ):
- self.image = image
- self.envs = envs or {}
- self.cmds = cmds
- self.args = args or []
- self.secrets = secrets or []
- self.result = result
- self.labels = labels or {}
- self.name = name
- self.ports = ports or []
- self.volumes = volumes or []
- self.volume_mounts = volume_mounts or []
- self.node_selectors = node_selectors or {}
- self.namespace = namespace
- self.image_pull_policy = image_pull_policy
- self.image_pull_secrets = image_pull_secrets
- self.init_containers = init_containers
- self.service_account_name = service_account_name
- self.resources = resources or Resources()
- self.annotations = annotations or {}
- self.affinity = affinity or {}
- self.hostnetwork = hostnetwork or False
- self.tolerations = tolerations or []
- self.security_context = security_context
- self.configmaps = configmaps or []
- self.pod_runtime_info_envs = pod_runtime_info_envs or []
- self.dnspolicy = dnspolicy
+ def attach_to_pod(self, pod):
+ cp_pod = copy.deepcopy(pod)
+ port = self.to_k8s_client_obj()
+ cp_pod.spec.containers[0].ports = cp_pod.spec.containers[0].ports or []
+ cp_pod.spec.containers[0].ports.append(port)
+ return cp_pod
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 56b41d7..249a5ad 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -14,169 +14,335 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-from airflow.kubernetes.pod import Pod, Port
-from airflow.kubernetes.volume import Volume
-from airflow.kubernetes.volume_mount import VolumeMount
-
+"""
+This module provides an interface between the previous Pod
+API and outputs a kubernetes.client.models.V1Pod.
+The advantage being that the full Kubernetes API
+is supported and no serialization need be written.
+"""
+
+import copy
+import kubernetes.client.models as k8s
+from airflow.executors import Executors
import uuid
-class PodGenerator:
- """Contains Kubernetes Airflow Worker configuration logic"""
-
- def __init__(self, kube_config=None):
- self.kube_config = kube_config
- self.ports = []
- self.volumes = []
- self.volume_mounts = []
- self.init_containers = []
-
- def add_init_container(self,
- name,
- image,
- security_context,
- init_environment,
- volume_mounts
- ):
- """
-
- Adds an init container to the launched pod. useful for pre-
+class PodDefaults:
+ """
+ Static defaults for the PodGenerator
+ """
+ XCOM_MOUNT_PATH = '/airflow/xcom'
+ SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
+ XCOM_CMD = """import time
+while True:
+ try:
+ time.sleep(3600)
+ except KeyboardInterrupt:
+ exit(0)
+ """
+ VOLUME_MOUNT = k8s.V1VolumeMount(
+ name='xcom',
+ mount_path=XCOM_MOUNT_PATH
+ )
+ VOLUME = k8s.V1Volume(
+ name='xcom',
+ empty_dir=k8s.V1EmptyDirVolumeSource()
+ )
+ SIDECAR_CONTAINER = k8s.V1Container(
+ name=SIDECAR_CONTAINER_NAME,
+ command=['python', '-c', XCOM_CMD],
+ image='python:3.5-alpine',
+ volume_mounts=[VOLUME_MOUNT]
+ )
- Args:
- name (str):
- image (str):
- security_context (dict):
- init_environment (dict):
- volume_mounts (dict):
- Returns:
+class PodGenerator:
+ """
+ Contains Kubernetes Airflow Worker configuration logic
+
+ Represents a kubernetes pod and manages execution of a single pod.
+ :param image: The docker image
+ :type image: str
+ :param envs: A dict containing the environment variables
+ :type envs: Dict[str, str]
+ :param cmds: The command to be run on the pod
+ :type cmds: List[str]
+ :param secrets: Secrets to be launched to the pod
+ :type secrets: List[airflow.kubernetes.models.secret.Secret]
+ :param image_pull_policy: Specify a policy to cache or always pull an image
+ :type image_pull_policy: str
+ :param image_pull_secrets: Any image pull secrets to be given to the pod.
+ If more than one secret is required, provide a comma separated list:
+ secret_a,secret_b
+ :type image_pull_secrets: str
+ :param affinity: A dict containing a group of affinity scheduling rules
+ :type affinity: dict
+ :param hostnetwork: If True enable host networking on the pod
+ :type hostnetwork: bool
+ :param tolerations: A list of kubernetes tolerations
+ :type tolerations: list
+ :param security_context: A dict containing the security context for the pod
+ :type security_context: dict
+ :param configmaps: Any configmap refs to envfrom.
+ If more than one configmap is required, provide a comma separated list
+ configmap_a,configmap_b
+ :type configmaps: str
+ :param dnspolicy: Specify a dnspolicy for the pod
+ :type dnspolicy: str
+ :param pod: The fully specified pod.
+ :type pod: kubernetes.client.models.V1Pod
+ """
+
+ def __init__(
+ self,
+ image,
+ name=None,
+ namespace=None,
+ volume_mounts=None,
+ envs=None,
+ cmds=None,
+ args=None,
+ labels=None,
+ node_selectors=None,
+ ports=None,
+ volumes=None,
+ image_pull_policy='IfNotPresent',
+ restart_policy='Never',
+ image_pull_secrets=None,
+ init_containers=None,
+ service_account_name=None,
+ resources=None,
+ annotations=None,
+ affinity=None,
+ hostnetwork=False,
+ tolerations=None,
+ security_context=None,
+ configmaps=None,
+ dnspolicy=None,
+ pod=None,
+ extract_xcom=False,
+ ):
+ self.ud_pod = pod
+ self.pod = k8s.V1Pod()
+ self.pod.api_version = 'v1'
+ self.pod.kind = 'Pod'
+
+ # Pod Metadata
+ self.metadata = k8s.V1ObjectMeta()
+ self.metadata.labels = labels
+ self.metadata.name = name + "-" + str(uuid.uuid4())[:8] if name else None
+ self.metadata.namespace = namespace
+ self.metadata.annotations = annotations
+
+ # Pod Container
+ self.container = k8s.V1Container(name='base')
+ self.container.image = image
+ self.container.env = []
+
+ if envs:
+ if isinstance(envs, dict):
+ for key, val in envs.items():
+ self.container.env.append(k8s.V1EnvVar(
+ name=key,
+ value=val
+ ))
+ elif isinstance(envs, list):
+ self.container.env.extend(envs)
+
+ configmaps = configmaps or []
+ self.container.env_from = []
+ for configmap in configmaps:
+ self.container.env_from.append(k8s.V1EnvFromSource(
+ config_map_ref=k8s.V1ConfigMapEnvSource(
+ name=configmap
+ )
+ ))
+
+ self.container.command = cmds or []
+ self.container.args = args or []
+ self.container.image_pull_policy = image_pull_policy
+ self.container.ports = ports or []
+ self.container.resources = resources
+ self.container.volume_mounts = volume_mounts or []
+
+ # Pod Spec
+ self.spec = k8s.V1PodSpec(containers=[])
+ self.spec.security_context = security_context
+ self.spec.tolerations = tolerations
+ self.spec.dns_policy = dnspolicy
+ self.spec.host_network = hostnetwork
+ self.spec.affinity = affinity
+ self.spec.service_account_name = service_account_name
+ self.spec.init_containers = init_containers
+ self.spec.volumes = volumes or []
+ self.spec.node_selector = node_selectors
+ self.spec.restart_policy = restart_policy
+
+ self.spec.image_pull_secrets = []
+
+ if image_pull_secrets:
+ for image_pull_secret in image_pull_secrets.split(','):
+ self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(
+ name=image_pull_secret
+ ))
+
+ # Attach sidecar
+ self.extract_xcom = extract_xcom
+
+ def gen_pod(self):
+ result = self.ud_pod
+
+ if result is None:
+ result = self.pod
+ result.spec = self.spec
+ result.metadata = self.metadata
+ result.spec.containers = [self.container]
+
+ if self.extract_xcom:
+ result = self.add_sidecar(result)
+
+ return result
+
+ @staticmethod
+ def add_sidecar(pod):
+ pod_cp = copy.deepcopy(pod)
+
+ pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
+ pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
+ pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
+
+ return pod_cp
+
+ @staticmethod
+ def from_obj(obj):
+ if obj is None:
+ return k8s.V1Pod()
+
+ if isinstance(obj, PodGenerator):
+ return obj.gen_pod()
+
+ if not isinstance(obj, dict):
+ raise TypeError(
+ 'Cannot convert a non-dictionary or non-PodGenerator '
+ 'object into a KubernetesExecutorConfig')
+
+ namespaced = obj.get(Executors.KubernetesExecutor, {})
+
+ resources = namespaced.get('resources')
+
+ if resources is None:
+ requests = {
+ 'cpu': namespaced.get('request_cpu'),
+ 'memory': namespaced.get('request_memory')
- """
- self.init_containers.append(
- {
- 'name': name,
- 'image': image,
- 'securityContext': security_context,
- 'env': init_environment,
- 'volumeMounts': volume_mounts
}
- )
-
- def _get_init_containers(self):
- return self.init_containers
-
- def add_port(self, port): # type: (Port) -> None
- """
- Adds a Port to the generator
-
- :param port: ports for generated pod
- :type port: airflow.kubernetes.pod.Port
- """
- self.ports.append({'name': port.name, 'containerPort': port.container_port})
-
- def add_volume(self, volume): # type: (Volume) -> None
- """
- Adds a Volume to the generator
-
- :param volume: volume for generated pod
- :type volume: airflow.kubernetes.volume.Volume
- """
-
- self._add_volume(name=volume.name, configs=volume.configs)
-
- def _add_volume(self, name, configs):
- """
-
- Args:
- name (str):
- configs (dict): Configurations for the volume.
- Could be used to define PersistentVolumeClaim, ConfigMap, etc...
-
- Returns:
-
- """
- volume_map = {'name': name}
- for k, v in configs.items():
- volume_map[k] = v
-
- self.volumes.append(volume_map)
-
- def add_volume_with_configmap(self, name, config_map):
- self.volumes.append(
- {
- 'name': name,
- 'configMap': config_map
+ limits = {
+ 'cpu': namespaced.get('limit_cpu'),
+ 'memory': namespaced.get('limit_memory')
}
+ all_resources = list(requests.values()) + list(limits.values())
+ if all(r is None for r in all_resources):
+ resources = None
+ else:
+ resources = k8s.V1ResourceRequirements(
+ requests=requests,
+ limits=limits
+ )
+
+ annotations = namespaced.get('annotations', {})
+ gcp_service_account_key = namespaced.get('gcp_service_account_key', None)
+
+ if annotations is not None and gcp_service_account_key is not None:
+ annotations.update({
+ 'iam.cloud.google.com/service-account': gcp_service_account_key
+ })
+
+ pod_spec_generator = PodGenerator(
+ image=namespaced.get('image'),
+ envs=namespaced.get('env'),
+ cmds=namespaced.get('cmds'),
+ args=namespaced.get('args'),
+ labels=namespaced.get('labels'),
+ node_selectors=namespaced.get('node_selectors'),
+ name=namespaced.get('name'),
+ ports=namespaced.get('ports'),
+ volumes=namespaced.get('volumes'),
+ volume_mounts=namespaced.get('volume_mounts'),
+ namespace=namespaced.get('namespace'),
+ image_pull_policy=namespaced.get('image_pull_policy'),
+ restart_policy=namespaced.get('restart_policy'),
+ image_pull_secrets=namespaced.get('image_pull_secrets'),
+ init_containers=namespaced.get('init_containers'),
+ service_account_name=namespaced.get('service_account_name'),
+ resources=resources,
+ annotations=namespaced.get('annotations'),
+ affinity=namespaced.get('affinity'),
+ hostnetwork=namespaced.get('hostnetwork'),
+ tolerations=namespaced.get('tolerations'),
+ security_context=namespaced.get('security_context'),
+ configmaps=namespaced.get('configmaps'),
+ dnspolicy=namespaced.get('dnspolicy'),
+ pod=namespaced.get('pod'),
+ extract_xcom=namespaced.get('extract_xcom'),
)
- def _add_mount(self,
- name,
- mount_path,
- sub_path,
- read_only):
- """
+ return pod_spec_generator.gen_pod()
- Args:
- name (str):
- mount_path (str):
- sub_path (str):
- read_only:
-
- Returns:
-
- """
-
- self.volume_mounts.append({
- 'name': name,
- 'mountPath': mount_path,
- 'subPath': sub_path,
- 'readOnly': read_only
- })
-
- def add_mount(self,
- volume_mount, # type: VolumeMount
- ):
+ @staticmethod
+ def reconcile_pods(base_pod, client_pod):
"""
- Adds a VolumeMount to the generator
-
- :param volume_mount: volume for generated pod
- :type volume_mount: airflow.kubernetes.volume_mount.VolumeMount
+ :param base_pod: has the base attributes which are overwritten if they exist
+ in the client pod and remain if they do not exist in the client_pod
+ :type base_pod: k8s.V1Pod
+ :param client_pod: the pod that the client wants to create.
+ :type client_pod: k8s.V1Pod
+ :return: the merged pods
+
+ This can't be done recursively as certain fields are preserved,
+ some overwritten, and some concatenated, e.g. The command
+ should be preserved from base, the volumes appended to and
+ the other fields overwritten.
"""
- self._add_mount(
- name=volume_mount.name,
- mount_path=volume_mount.mount_path,
- sub_path=volume_mount.sub_path,
- read_only=volume_mount.read_only
- )
- def _get_volumes_and_mounts(self):
- return self.volumes, self.volume_mounts
-
- def _get_image_pull_secrets(self):
- """Extracts any image pull secrets for fetching container(s)"""
- if not self.kube_config.image_pull_secrets:
- return []
- return self.kube_config.image_pull_secrets.split(',')
-
- def make_pod(self, namespace, image, pod_id, cmds, arguments, labels):
- volumes, volume_mounts = self._get_volumes_and_mounts()
- worker_init_container_spec = self._get_init_containers()
-
- return Pod(
- namespace=namespace,
- name=pod_id + "-" + str(uuid.uuid4())[:8],
- image=image,
- cmds=cmds,
- args=arguments,
- labels=labels,
- envs={},
- secrets=[],
- # service_account_name=self.kube_config.worker_service_account_name,
- # image_pull_secrets=self.kube_config.image_pull_secrets,
- init_containers=worker_init_container_spec,
- ports=self.ports,
- volumes=volumes,
- volume_mounts=volume_mounts,
- resources=None
- )
+ client_pod_cp = copy.deepcopy(client_pod)
+
+ def merge_objects(base_obj, client_obj):
+ for base_key in base_obj.to_dict().keys():
+ base_val = getattr(base_obj, base_key, None)
+ if not getattr(client_obj, base_key, None) and base_val:
+ setattr(client_obj, base_key, base_val)
+
+ def extend_object_field(base_obj, client_obj, field_name):
+ base_obj_field = getattr(base_obj, field_name, None)
+ client_obj_field = getattr(client_obj, field_name, None)
+ if not base_obj_field:
+ return
+ if not client_obj_field:
+ setattr(client_obj, field_name, base_obj_field)
+ return
+ appended_fields = base_obj_field + client_obj_field
+ setattr(client_obj, field_name, appended_fields)
+
+ # Values at the pod and metadata should be overwritten where they exist,
+ # but certain values at the spec and container level must be conserved.
+ base_container = base_pod.spec.containers[0]
+ client_container = client_pod_cp.spec.containers[0]
+
+ extend_object_field(base_container, client_container, 'volume_mounts')
+ extend_object_field(base_container, client_container, 'env')
+ extend_object_field(base_container, client_container, 'env_from')
+ extend_object_field(base_container, client_container, 'ports')
+ extend_object_field(base_container, client_container, 'volume_devices')
+ client_container.command = base_container.command
+ client_container.args = base_container.args
+ merge_objects(base_pod.spec.containers[0], client_pod_cp.spec.containers[0])
+ # Just append any additional containers from the base pod
+ client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
+
+ merge_objects(base_pod.metadata, client_pod_cp.metadata)
+
+ extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
+ merge_objects(base_pod.spec, client_pod_cp.spec)
+ merge_objects(base_pod, client_pod_cp)
+
+ return client_pod_cp
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 8a9028b..47d8ed5 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -18,30 +18,22 @@
import json
import time
from datetime import datetime as dt
-from typing import Tuple, Optional
-
-from requests.exceptions import BaseHTTPError
import tenacity
-
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
+from requests.exceptions import BaseHTTPError
+from airflow import AirflowException
+from airflow.kubernetes.pod_generator import PodDefaults
from airflow.settings import pod_mutation_hook
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
-from airflow import AirflowException
-
-from airflow.kubernetes.pod import Pod
-from airflow.kubernetes.kubernetes_request_factory import \
- pod_request_factory as pod_factory
-
from .kube_client import get_kube_client
-class PodStatus(object):
- """Status of the PODs"""
+class PodStatus:
PENDING = 'pending'
RUNNING = 'running'
FAILED = 'failed'
@@ -68,35 +60,36 @@ class PodLauncher(LoggingMixin):
cluster_context=cluster_context)
self._watch = watch.Watch()
self.extract_xcom = extract_xcom
- self.kube_req_factory = pod_factory.ExtractXcomPodRequestFactory(
- ) if extract_xcom else pod_factory.SimplePodRequestFactory()
def run_pod_async(self, pod, **kwargs):
"""Runs POD asynchronously"""
pod_mutation_hook(pod)
- req = self.kube_req_factory.create(pod)
- self.log.debug('Pod Creation Request: \n%s', json.dumps(req, indent=2))
+ sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
+ json_pod = json.dumps(sanitized_pod, indent=2)
+
+ self.log.debug('Pod Creation Request: \n%s', json_pod)
try:
- resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace, **kwargs)
+ resp = self._client.create_namespaced_pod(body=sanitized_pod,
+ namespace=pod.metadata.namespace, **kwargs)
self.log.debug('Pod Creation Response: %s', resp)
- except ApiException:
- self.log.exception('Exception when attempting to create Namespaced Pod.')
- raise
+ except Exception as e:
+ self.log.exception('Exception when attempting '
+ 'to create Namespaced Pod: %s', json_pod)
+ raise e
return resp
def delete_pod(self, pod):
"""Deletes POD"""
try:
self._client.delete_namespaced_pod(
- pod.name, pod.namespace, body=client.V1DeleteOptions())
+ pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions())
except ApiException as e:
# If the pod is already deleted
if e.status != 404:
raise
def run_pod(self, pod, startup_timeout=120, get_logs=True):
- # type: (Pod, int, bool) -> Tuple[State, Optional[str]]
"""
Launches the pod synchronously and waits for completion.
Args:
@@ -117,7 +110,6 @@ class PodLauncher(LoggingMixin):
return self._monitor_pod(pod, get_logs)
def _monitor_pod(self, pod, get_logs):
- # type: (Pod, bool) -> Tuple[State, Optional[str]]
if get_logs:
logs = self.read_pod_logs(pod)
@@ -126,13 +118,13 @@ class PodLauncher(LoggingMixin):
result = None
if self.extract_xcom:
while self.base_container_is_running(pod):
- self.log.info('Container %s has state %s', pod.name, State.RUNNING)
+ self.log.info('Container %s has state %s', pod.metadata.name, State.RUNNING)
time.sleep(2)
result = self._extract_xcom(pod)
self.log.info(result)
result = json.loads(result)
while self.pod_is_running(pod):
- self.log.info('Pod %s has state %s', pod.name, State.RUNNING)
+ self.log.info('Pod %s has state %s', pod.metadata.name, State.RUNNING)
time.sleep(2)
return self._task_status(self.read_pod(pod)), result
@@ -158,6 +150,8 @@ class PodLauncher(LoggingMixin):
event = self.read_pod(pod)
status = next(iter(filter(lambda s: s.name == 'base',
event.status.container_statuses)), None)
+ if not status:
+ return False
return status.state.running is not None
@tenacity.retry(
@@ -169,8 +163,8 @@ class PodLauncher(LoggingMixin):
"""Reads log from the POD"""
try:
return self._client.read_namespaced_pod_log(
- name=pod.name,
- namespace=pod.namespace,
+ name=pod.metadata.name,
+ namespace=pod.metadata.namespace,
container='base',
follow=True,
tail_lines=10,
@@ -189,7 +183,7 @@ class PodLauncher(LoggingMixin):
def read_pod(self, pod):
"""Read POD information"""
try:
- return self._client.read_namespaced_pod(pod.name, pod.namespace)
+ return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
except BaseHTTPError as e:
raise AirflowException(
'There was an error reading the kubernetes API: {}'.format(e)
@@ -197,19 +191,19 @@ class PodLauncher(LoggingMixin):
def _extract_xcom(self, pod):
resp = kubernetes_stream(self._client.connect_get_namespaced_pod_exec,
- pod.name, pod.namespace,
- container=self.kube_req_factory.SIDECAR_CONTAINER_NAME,
+ pod.metadata.name, pod.metadata.namespace,
+ container=PodDefaults.SIDECAR_CONTAINER_NAME,
command=['/bin/sh'], stdin=True, stdout=True,
stderr=True, tty=False,
_preload_content=False)
try:
result = self._exec_pod_command(
- resp, 'cat {}/return.json'.format(self.kube_req_factory.XCOM_MOUNT_PATH))
+ resp, 'cat {}/return.json'.format(PodDefaults.XCOM_MOUNT_PATH))
self._exec_pod_command(resp, 'kill -s SIGINT 1')
finally:
resp.close()
if result is None:
- raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.name))
+ raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.metadata.name))
return result
def _exec_pod_command(self, resp, command):
diff --git a/airflow/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py
index f52791e..7d23a7e 100644
--- a/airflow/kubernetes/pod_runtime_info_env.py
+++ b/airflow/kubernetes/pod_runtime_info_env.py
@@ -15,11 +15,15 @@
# specific language governing permissions and limitations
# under the License.
"""
-Classes for using Kubernetes Downward API
+Classes for interacting with Kubernetes API
"""
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
-class PodRuntimeInfoEnv:
+
+class PodRuntimeInfoEnv(K8SModel):
"""Defines Pod runtime information as environment variable"""
def __init__(self, name, field_path):
@@ -34,3 +38,23 @@ class PodRuntimeInfoEnv:
"""
self.name = name
self.field_path = field_path
+
+ def to_k8s_client_obj(self):
+ """
+ :return: kubernetes.client.models.V1EnvVar
+ """
+ return k8s.V1EnvVar(
+ name=self.name,
+ value_from=k8s.V1EnvVarSource(
+ field_ref=k8s.V1ObjectFieldSelector(
+ self.field_path
+ )
+ )
+ )
+
+ def attach_to_pod(self, pod):
+ cp_pod = copy.deepcopy(pod)
+ env = self.to_k8s_client_obj()
+ cp_pod.spec.containers[0].env = cp_pod.spec.containers[0].env or []
+ cp_pod.spec.containers[0].env.append(env)
+ return cp_pod
diff --git a/airflow/kubernetes/secret.py b/airflow/kubernetes/secret.py
index fde1ded..8591a88 100644
--- a/airflow/kubernetes/secret.py
+++ b/airflow/kubernetes/secret.py
@@ -14,10 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+
+import uuid
+import copy
+import kubernetes.client.models as k8s
from airflow.exceptions import AirflowConfigException
+from airflow.kubernetes.k8s_model import K8SModel
-class Secret(object):
+class Secret(K8SModel):
"""Defines Kubernetes Secret Volume"""
def __init__(self, deploy_type, deploy_target, secret, key=None):
@@ -38,6 +46,9 @@ class Secret(object):
if not provided in `deploy_type` `env` it will mount all secrets in object
:type key: str or None
"""
+ if deploy_type not in ('env', 'volume'):
+ raise AirflowConfigException("deploy_type must be env or volume")
+
self.deploy_type = deploy_type
self.deploy_target = deploy_target
@@ -53,6 +64,56 @@ class Secret(object):
self.secret = secret
self.key = key
+ def to_env_secret(self):
+ return k8s.V1EnvVar(
+ name=self.deploy_target,
+ value_from=k8s.V1EnvVarSource(
+ secret_key_ref=k8s.V1SecretKeySelector(
+ name=self.secret,
+ key=self.key
+ )
+ )
+ )
+
+ def to_env_from_secret(self):
+ return k8s.V1EnvFromSource(
+ secret_ref=k8s.V1SecretEnvSource(name=self.secret)
+ )
+
+ def to_volume_secret(self):
+ vol_id = 'secretvol{}'.format(uuid.uuid4())
+ return (
+ k8s.V1Volume(
+ name=vol_id,
+ secret=k8s.V1SecretVolumeSource(
+ secret_name=self.secret
+ )
+ ),
+ k8s.V1VolumeMount(
+ mount_path=self.deploy_target,
+ name=vol_id,
+ read_only=True
+ )
+ )
+
+ def attach_to_pod(self, pod):
+ cp_pod = copy.deepcopy(pod)
+ if self.deploy_type == 'volume':
+ volume, volume_mount = self.to_volume_secret()
+ cp_pod.spec.volumes = pod.spec.volumes or []
+ cp_pod.spec.volumes.append(volume)
+ cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
+ cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
+ if self.deploy_type == 'env' and self.key is not None:
+ env = self.to_env_secret()
+ cp_pod.spec.containers[0].env = cp_pod.spec.containers[0].env or []
+ cp_pod.spec.containers[0].env.append(env)
+ if self.deploy_type == 'env' and self.key is None:
+ env_from = self.to_env_from_secret()
+ cp_pod.spec.containers[0].env_from = cp_pod.spec.containers[0].env_from or []
+ cp_pod.spec.containers[0].env_from.append(env_from)
+ return cp_pod
+
def __eq__(self, other):
return (
self.deploy_type == other.deploy_type and
diff --git a/airflow/kubernetes/volume.py b/airflow/kubernetes/volume.py
index 94003fe..9d85959 100644
--- a/airflow/kubernetes/volume.py
+++ b/airflow/kubernetes/volume.py
@@ -14,11 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+import copy
+from airflow.kubernetes.k8s_model import K8SModel
-class Volume:
- """Defines Kubernetes Volume"""
+class Volume(K8SModel):
def __init__(self, name, configs):
""" Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps
and Persistent Volumes
@@ -31,3 +35,15 @@ class Volume:
"""
self.name = name
self.configs = configs
+
+ def to_k8s_client_obj(self):
+ configs = self.configs
+ configs['name'] = self.name
+ return configs
+
+ def attach_to_pod(self, pod):
+ cp_pod = copy.deepcopy(pod)
+ volume = self.to_k8s_client_obj()
+ cp_pod.spec.volumes = pod.spec.volumes or []
+ cp_pod.spec.volumes.append(volume)
+ return cp_pod
diff --git a/airflow/kubernetes/volume_mount.py b/airflow/kubernetes/volume_mount.py
index 4bdf09c..74563b4 100644
--- a/airflow/kubernetes/volume_mount.py
+++ b/airflow/kubernetes/volume_mount.py
@@ -14,9 +14,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
-class VolumeMount:
+
+class VolumeMount(K8SModel):
"""Defines Kubernetes Volume Mount"""
def __init__(self, name, mount_path, sub_path, read_only):
@@ -35,3 +42,18 @@ class VolumeMount:
self.mount_path = mount_path
self.sub_path = sub_path
self.read_only = read_only
+
+ def to_k8s_client_obj(self):
+ return k8s.V1VolumeMount(
+ name=self.name,
+ mount_path=self.mount_path,
+ sub_path=self.sub_path,
+ read_only=self.read_only
+ )
+
+ def attach_to_pod(self, pod):
+ cp_pod = copy.deepcopy(pod)
+ volume_mount = self.to_k8s_client_obj()
+ cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
+ cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
+ return cp_pod
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 74e464a..77bb2ee 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -16,10 +16,13 @@
# under the License.
import os
+
+import kubernetes.client.models as k8s
import six
from airflow.configuration import conf
-from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.k8s_model import append_to_pod
+from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.version import version as airflow_version
@@ -51,123 +54,120 @@ class WorkerConfiguration(LoggingMixin):
return []
# Otherwise, define a git-sync init container
- init_environment = [{
- 'name': 'GIT_SYNC_REPO',
- 'value': self.kube_config.git_repo
- }, {
- 'name': 'GIT_SYNC_BRANCH',
- 'value': self.kube_config.git_branch
- }, {
- 'name': 'GIT_SYNC_ROOT',
- 'value': self.kube_config.git_sync_root
- }, {
- 'name': 'GIT_SYNC_DEST',
- 'value': self.kube_config.git_sync_dest
- }, {
- 'name': 'GIT_SYNC_DEPTH',
- 'value': '1'
- }, {
- 'name': 'GIT_SYNC_ONE_TIME',
- 'value': 'true'
- }, {
- 'name': 'GIT_SYNC_REV',
- 'value': self.kube_config.git_sync_rev
- }]
-
+ init_environment = [k8s.V1EnvVar(
+ name='GIT_SYNC_REPO',
+ value=self.kube_config.git_repo
+ ), k8s.V1EnvVar(
+ name='GIT_SYNC_BRANCH',
+ value=self.kube_config.git_branch
+ ), k8s.V1EnvVar(
+ name='GIT_SYNC_ROOT',
+ value=self.kube_config.git_sync_root
+ ), k8s.V1EnvVar(
+ name='GIT_SYNC_DEST',
+ value=self.kube_config.git_sync_dest
+ ), k8s.V1EnvVar(
+ name='GIT_SYNC_DEPTH',
+ value='1'
+ ), k8s.V1EnvVar(
+ name='GIT_SYNC_ONE_TIME',
+ value='true'
+ ), k8s.V1EnvVar(
+ name='GIT_SYNC_REV',
+ value=self.kube_config.git_sync_rev
+ )]
for env_var_name, env_var_val in six.iteritems(self.kube_config.kube_env_vars):
- init_environment.append({
- 'name': env_var_name,
- 'value': env_var_val
- })
-
+ init_environment.append(k8s.V1EnvVar(
+ name=env_var_name,
+ value=env_var_val
+ ))
if self.kube_config.git_user:
- init_environment.append({
- 'name': 'GIT_SYNC_USERNAME',
- 'value': self.kube_config.git_user
- })
+ init_environment.append(k8s.V1EnvVar(
+ name='GIT_SYNC_USERNAME',
+ value=self.kube_config.git_user
+ ))
if self.kube_config.git_password:
- init_environment.append({
- 'name': 'GIT_SYNC_PASSWORD',
- 'value': self.kube_config.git_password
- })
+ init_environment.append(k8s.V1EnvVar(
+ name='GIT_SYNC_PASSWORD',
+ value=self.kube_config.git_password
+ ))
+
+ volume_mounts = [k8s.V1VolumeMount(
+ mount_path=self.kube_config.git_sync_root,
+ name=self.dags_volume_name,
+ read_only=False
+ )]
if self.kube_config.git_sync_credentials_secret:
init_environment.extend([
- {
- 'name': 'GIT_SYNC_USERNAME',
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': self.kube_config.git_sync_credentials_secret,
- 'key': 'GIT_SYNC_USERNAME'
- }
- }
- },
- {
- 'name': 'GIT_SYNC_PASSWORD',
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': self.kube_config.git_sync_credentials_secret,
- 'key': 'GIT_SYNC_PASSWORD'
- }
- }
- }
+ k8s.V1EnvVar(
+ name='GIT_SYNC_USERNAME',
+ value_from=k8s.V1EnvVarSource(
+ secret_key_ref=k8s.V1SecretKeySelector(
+ name=self.kube_config.git_sync_credentials_secret,
+ key='GIT_SYNC_USERNAME')
+ )
+ ),
+ k8s.V1EnvVar(
+ name='GIT_SYNC_PASSWORD',
+ value_from=k8s.V1EnvVarSource(
+ secret_key_ref=k8s.V1SecretKeySelector(
+ name=self.kube_config.git_sync_credentials_secret,
+ key='GIT_SYNC_PASSWORD')
+ )
+ )
])
- volume_mounts = [{
- 'mountPath': self.kube_config.git_sync_root,
- 'name': self.dags_volume_name,
- 'readOnly': False
- }]
if self.kube_config.git_ssh_key_secret_name:
- volume_mounts.append({
- 'name': self.git_sync_ssh_secret_volume_name,
- 'mountPath': '/etc/git-secret/ssh',
- 'subPath': 'ssh'
- })
- init_environment.extend([
- {
- 'name': 'GIT_SSH_KEY_FILE',
- 'value': '/etc/git-secret/ssh'
- },
- {
- 'name': 'GIT_SYNC_SSH',
- 'value': 'true'
- }])
- if self.kube_config.git_ssh_known_hosts_configmap_name:
- volume_mounts.append({
- 'name': self.git_sync_ssh_known_hosts_volume_name,
- 'mountPath': '/etc/git-secret/known_hosts',
- 'subPath': 'known_hosts'
- })
+ volume_mounts.append(k8s.V1VolumeMount(
+ name=self.git_sync_ssh_secret_volume_name,
+ mount_path='/etc/git-secret/ssh',
+ sub_path='ssh'
+ ))
+
init_environment.extend([
- {
- 'name': 'GIT_KNOWN_HOSTS',
- 'value': 'true'
- },
- {
- 'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
- 'value': '/etc/git-secret/known_hosts'
- }
+ k8s.V1EnvVar(
+ name='GIT_SSH_KEY_FILE',
+ value='/etc/git-secret/ssh'
+ ),
+ k8s.V1EnvVar(
+ name='GIT_SYNC_SSH',
+ value='true'
+ )
])
+
+ if self.kube_config.git_ssh_known_hosts_configmap_name:
+ volume_mounts.append(k8s.V1VolumeMount(
+ name=self.git_sync_ssh_known_hosts_volume_name,
+ mount_path='/etc/git-secret/known_hosts',
+ sub_path='known_hosts'
+ ))
+ init_environment.extend([k8s.V1EnvVar(
+ name='GIT_KNOWN_HOSTS',
+ value='true'
+ ), k8s.V1EnvVar(
+ name='GIT_SSH_KNOWN_HOSTS_FILE',
+ value='/etc/git-secret/known_hosts'
+ )])
else:
- init_environment.append({
- 'name': 'GIT_KNOWN_HOSTS',
- 'value': 'false'
- })
-
- init_containers = [{
- 'name': self.kube_config.git_sync_init_container_name,
- 'image': self.kube_config.git_sync_container,
- 'env': init_environment,
- 'volumeMounts': volume_mounts
- }]
+ init_environment.append(k8s.V1EnvVar(
+ name='GIT_KNOWN_HOSTS',
+ value='false'
+ ))
+
+ init_containers = k8s.V1Container(
+ name=self.kube_config.git_sync_init_container_name,
+ image=self.kube_config.git_sync_container,
+ env=init_environment,
+ volume_mounts=volume_mounts
+ )
if self.kube_config.git_sync_run_as_user != "":
- init_containers[0]['securityContext'] = {
- 'runAsUser': self.kube_config.git_sync_run_as_user # git-sync user
- }
+ init_containers.security_context = k8s.V1SecurityContext(
+ run_as_user=self.kube_config.git_sync_run_as_user or 65533
+ ) # git-sync user
- return init_containers
+ return [init_containers]
def _get_environment(self):
"""Defines any necessary environment variables for the pod executor"""
@@ -196,9 +196,21 @@ class WorkerConfiguration(LoggingMixin):
def _get_configmaps(self):
"""Extracts any configmapRefs to envFrom"""
- if not self.kube_config.env_from_configmap_ref:
- return []
- return self.kube_config.env_from_configmap_ref.split(',')
+ env_from = []
+
+ if self.kube_config.env_from_configmap_ref:
+ for config_map_ref in self.kube_config.env_from_configmap_ref.split(','):
+ env_from.append(
+ k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(config_map_ref))
+ )
+
+ if self.kube_config.env_from_secret_ref:
+ for secret_ref in self.kube_config.env_from_secret_ref.split(','):
+ env_from.append(
+ k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(secret_ref))
+ )
+
+ return env_from
def _get_secrets(self):
"""Defines any necessary secrets for the pod executor"""
@@ -222,21 +234,23 @@ class WorkerConfiguration(LoggingMixin):
"""Extracts any image pull secrets for fetching container(s)"""
if not self.kube_config.image_pull_secrets:
return []
- return self.kube_config.image_pull_secrets.split(',')
+ pull_secrets = self.kube_config.image_pull_secrets.split(',')
+ return list(map(lambda name: k8s.V1LocalObjectReference(name), pull_secrets))
def _get_security_context(self):
"""Defines the security context"""
- security_context = {}
+
+ security_context = k8s.V1PodSecurityContext()
if self.kube_config.worker_run_as_user != "":
- security_context['runAsUser'] = self.kube_config.worker_run_as_user
+ security_context.run_as_user = self.kube_config.worker_run_as_user
if self.kube_config.worker_fs_group != "":
- security_context['fsGroup'] = self.kube_config.worker_fs_group
+ security_context.fs_group = self.kube_config.worker_fs_group
# set fs_group to 65533 if not explicitly specified and using git ssh keypair auth
- if self.kube_config.git_ssh_key_secret_name and security_context.get('fsGroup') is None:
- security_context['fsGroup'] = 65533
+ if self.kube_config.git_ssh_key_secret_name and security_context.fs_group is None:
+ security_context.fs_group = 65533
return security_context
@@ -246,22 +260,77 @@ class WorkerConfiguration(LoggingMixin):
copy.update(labels)
return copy
- def _get_volumes_and_mounts(self):
+ def _get_volume_mounts(self):
+ volume_mounts = {
+ self.dags_volume_name: k8s.V1VolumeMount(
+ name=self.dags_volume_name,
+ mount_path=self.generate_dag_volume_mount_path(),
+ read_only=True,
+ ),
+ self.logs_volume_name: k8s.V1VolumeMount(
+ name=self.logs_volume_name,
+ mount_path=self.worker_airflow_logs,
+ )
+ }
+
+ if self.kube_config.dags_volume_subpath:
+ volume_mounts[self.dags_volume_name].sub_path = self.kube_config.dags_volume_subpath
+
+ if self.kube_config.logs_volume_subpath:
+ volume_mounts[self.logs_volume_name].sub_path = self.kube_config.logs_volume_subpath
+
+ if self.kube_config.dags_in_image:
+ del volume_mounts[self.dags_volume_name]
+
+ # Mount the airflow.cfg file via a configmap the user has specified
+ if self.kube_config.airflow_configmap:
+ config_volume_name = 'airflow-config'
+ config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
+ volume_mounts[config_volume_name] = k8s.V1VolumeMount(
+ name=config_volume_name,
+ mount_path=config_path,
+ sub_path='airflow.cfg',
+ read_only=True
+ )
+ if self.kube_config.airflow_local_settings_configmap:
+ config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
+
+ if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
+ config_volume_name = 'airflow-local-settings'
+
+ volume_mounts[config_volume_name] = k8s.V1VolumeMount(
+ name=config_volume_name,
+ mount_path=config_path,
+ sub_path='airflow_local_settings.cfg',
+ read_only=True
+ )
+
+ else:
+ volume_mounts['airflow-local-settings'] = k8s.V1VolumeMount(
+ name='airflow-config',
+ mount_path=config_path,
+ sub_path='airflow_local_settings.cfg',
+ read_only=True
+ )
+
+ return list(volume_mounts.values())
+
+ def _get_volumes(self):
def _construct_volume(name, claim, host):
- volume = {
- 'name': name
- }
+ volume = k8s.V1Volume(name=name)
+
if claim:
- volume['persistentVolumeClaim'] = {
- 'claimName': claim
- }
+ volume.persistent_volume_claim = k8s.V1PersistentVolumeClaimVolumeSource(
+ claim_name=claim
+ )
elif host:
- volume['hostPath'] = {
- 'path': host,
- 'type': ''
- }
+ volume.host_path = k8s.V1HostPathVolumeSource(
+ path=host,
+ type=''
+ )
else:
- volume['emptyDir'] = {}
+ volume.empty_dir = {}
+
return volume
volumes = {
@@ -277,135 +346,70 @@ class WorkerConfiguration(LoggingMixin):
)
}
- volume_mounts = {
- self.dags_volume_name: {
- 'name': self.dags_volume_name,
- 'mountPath': self.generate_dag_volume_mount_path(),
- 'readOnly': True,
- },
- self.logs_volume_name: {
- 'name': self.logs_volume_name,
- 'mountPath': self.worker_airflow_logs,
- }
- }
-
- if self.kube_config.dags_volume_subpath:
- volume_mounts[self.dags_volume_name]['subPath'] = self.kube_config.dags_volume_subpath
-
- if self.kube_config.logs_volume_subpath:
- volume_mounts[self.logs_volume_name]['subPath'] = self.kube_config.logs_volume_subpath
-
if self.kube_config.dags_in_image:
del volumes[self.dags_volume_name]
- del volume_mounts[self.dags_volume_name]
# Get the SSH key from secrets as a volume
if self.kube_config.git_ssh_key_secret_name:
- volumes[self.git_sync_ssh_secret_volume_name] = {
- 'name': self.git_sync_ssh_secret_volume_name,
- 'secret': {
- 'secretName': self.kube_config.git_ssh_key_secret_name,
- 'items': [{
- 'key': self.git_ssh_key_secret_key,
- 'path': 'ssh',
- 'mode': 0o440
- }]
- }
- }
+ volumes[self.git_sync_ssh_secret_volume_name] = k8s.V1Volume(
+ name=self.git_sync_ssh_secret_volume_name,
+ secret=k8s.V1SecretVolumeSource(
+ secret_name=self.kube_config.git_ssh_key_secret_name,
+ items=[k8s.V1KeyToPath(
+ key=self.git_ssh_key_secret_key,
+ path='ssh',
+ mode=0o440
+ )]
+ )
+ )
if self.kube_config.git_ssh_known_hosts_configmap_name:
- volumes[self.git_sync_ssh_known_hosts_volume_name] = {
- 'name': self.git_sync_ssh_known_hosts_volume_name,
- 'configMap': {
- 'name': self.kube_config.git_ssh_known_hosts_configmap_name
- },
- 'mode': 0o440
- }
+ volumes[self.git_sync_ssh_known_hosts_volume_name] = k8s.V1Volume(
+ name=self.git_sync_ssh_known_hosts_volume_name,
+ config_map=k8s.V1ConfigMapVolumeSource(
+ name=self.kube_config.git_ssh_known_hosts_configmap_name,
+ default_mode=0o440
+ )
+ )
+
+ # Mount the airflow.cfg file via a configmap the user has specified
+ if self.kube_config.airflow_configmap:
+ config_volume_name = 'airflow-config'
+ volumes[config_volume_name] = k8s.V1Volume(
+ name=config_volume_name,
+ config_map=k8s.V1ConfigMapVolumeSource(
+ name=self.kube_config.airflow_configmap
+ )
+ )
if self.kube_config.airflow_local_settings_configmap:
config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
config_volume_name = 'airflow-local-settings'
- volumes[config_volume_name] = {
- 'name': config_volume_name,
- 'configMap': {
- 'name': self.kube_config.airflow_local_settings_configmap
- }
- }
-
- volume_mounts[config_volume_name] = {
- 'name': config_volume_name,
- 'mountPath': config_path,
- 'subPath': 'airflow_local_settings.py',
- 'readOnly': True
- }
-
- else:
- volume_mounts['airflow-local-settings'] = {
- 'name': 'airflow-config',
- 'mountPath': config_path,
- 'subPath': 'airflow_local_settings.py',
- 'readOnly': True
- }
+ volumes[config_volume_name] = k8s.V1Volume(
+ name=config_volume_name,
+ config_map=k8s.V1ConfigMapVolumeSource(
+ name=self.kube_config.airflow_local_settings_configmap
+ )
+ )
- # Mount the airflow.cfg file via a configmap the user has specified
- if self.kube_config.airflow_configmap:
- config_volume_name = 'airflow-config'
- config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
- volumes[config_volume_name] = {
- 'name': config_volume_name,
- 'configMap': {
- 'name': self.kube_config.airflow_configmap
- }
- }
- volume_mounts[config_volume_name] = {
- 'name': config_volume_name,
- 'mountPath': config_path,
- 'subPath': 'airflow.cfg',
- 'readOnly': True
- }
-
- return volumes, volume_mounts
+ return list(volumes.values())
def generate_dag_volume_mount_path(self):
if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host:
- dag_volume_mount_path = self.worker_airflow_dags
- else:
- dag_volume_mount_path = self.kube_config.git_dags_folder_mount_point
+ return self.worker_airflow_dags
- return dag_volume_mount_path
+ return self.kube_config.git_dags_folder_mount_point
def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
- try_number, airflow_command, kube_executor_config):
- volumes_dict, volume_mounts_dict = self._get_volumes_and_mounts()
- worker_init_container_spec = self._get_init_containers()
- resources = Resources(
- request_memory=kube_executor_config.request_memory,
- request_cpu=kube_executor_config.request_cpu,
- limit_memory=kube_executor_config.limit_memory,
- limit_cpu=kube_executor_config.limit_cpu,
- limit_gpu=kube_executor_config.limit_gpu
- )
- gcp_sa_key = kube_executor_config.gcp_service_account_key
- annotations = dict(kube_executor_config.annotations) or self.kube_config.kube_annotations
- if gcp_sa_key:
- annotations['iam.cloud.google.com/service-account'] = gcp_sa_key
-
- volumes = [value for value in volumes_dict.values()] + kube_executor_config.volumes
- volume_mounts = [value for value in volume_mounts_dict.values()] + kube_executor_config.volume_mounts
-
- affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
- tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations
-
- return Pod(
+ try_number, airflow_command):
+ pod_generator = PodGenerator(
namespace=namespace,
name=pod_id,
- image=kube_executor_config.image or self.kube_config.kube_image,
- image_pull_policy=(kube_executor_config.image_pull_policy or
- self.kube_config.kube_image_pull_policy),
- cmds=airflow_command,
- labels=self._get_labels(kube_executor_config.labels, {
+ image=self.kube_config.kube_image,
+ image_pull_policy=self.kube_config.kube_image_pull_policy,
+ labels={
'airflow-worker': worker_uuid,
'dag_id': dag_id,
'task_id': task_id,
@@ -413,20 +417,22 @@ class WorkerConfiguration(LoggingMixin):
'try_number': str(try_number),
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_executor': 'True',
- }),
+ },
+ cmds=airflow_command,
+ volumes=self._get_volumes(),
+ volume_mounts=self._get_volume_mounts(),
+ init_containers=self._get_init_containers(),
+ annotations=self.kube_config.kube_annotations,
+ affinity=self.kube_config.kube_affinity,
+ tolerations=self.kube_config.kube_tolerations,
envs=self._get_environment(),
- secrets=self._get_secrets(),
+ node_selectors=self.kube_config.kube_node_selectors,
service_account_name=self.kube_config.worker_service_account_name,
- image_pull_secrets=self.kube_config.image_pull_secrets,
- init_containers=worker_init_container_spec,
- volumes=volumes,
- volume_mounts=volume_mounts,
- resources=resources,
- annotations=annotations,
- node_selectors=(kube_executor_config.node_selectors or
- self.kube_config.kube_node_selectors),
- affinity=affinity,
- tolerations=tolerations,
- security_context=self._get_security_context(),
- configmaps=self._get_configmaps()
)
+
+ pod = pod_generator.gen_pod()
+ pod.spec.containers[0].env_from = pod.spec.containers[0].env_from or []
+ pod.spec.containers[0].env_from.extend(self._get_env_from())
+ pod.spec.security_context = self._get_security_context()
+
+ return append_to_pod(pod, self._get_secrets())
diff --git a/airflow/settings.py b/airflow/settings.py
index 6f82e5d..bf2fb1d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -182,8 +182,9 @@ def policy(task_instance):
def pod_mutation_hook(pod):
"""
- This setting allows altering ``Pod`` objects before they are passed to
- the Kubernetes client by the ``PodLauncher`` for scheduling.
+ This setting allows altering ``kubernetes.client.models.V1Pod`` object
+ before they are passed to the Kubernetes client by the ``PodLauncher``
+ for scheduling.
To define a pod mutation hook, add a ``airflow_local_settings`` module
to your PYTHONPATH that defines this ``pod_mutation_hook`` function.
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 95333fc..9073493 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -16,19 +16,16 @@
# under the License.
#
-import uuid
+import random
import re
import string
-import random
-
-from airflow.utils import timezone
-from urllib3 import HTTPResponse
from datetime import datetime
import six
+from urllib3 import HTTPResponse
+from airflow.utils import timezone
from tests.compat import mock
-from tests.test_utils.config import conf_vars
try:
from kubernetes.client.rest import ApiException
@@ -36,15 +33,8 @@ try:
from airflow.configuration import conf # noqa: F401
from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
from airflow.executors.kubernetes_executor import KubernetesExecutor
- from airflow.executors.kubernetes_executor import KubeConfig
- from airflow.executors.kubernetes_executor import KubernetesExecutorConfig
- from airflow.kubernetes.worker_configuration import WorkerConfiguration
- from airflow.exceptions import AirflowConfigException
from airflow.utils.state import State
- from airflow.version import version as airflow_version
- from airflow.kubernetes.secret import Secret
-except ImportError as e:
- print(e)
+except ImportError:
AirflowKubernetesScheduler = None # type: ignore
if six.PY2:
@@ -130,852 +120,6 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
self.assertEqual(datetime_obj, new_datetime_obj)
-class TestKubernetesWorkerConfiguration(unittest.TestCase):
- """
- Tests that if dags_volume_subpath/logs_volume_subpath configuration
- options are passed to worker pod config
- """
-
- affinity_config = {
- 'podAntiAffinity': {
- 'requiredDuringSchedulingIgnoredDuringExecution': [
- {
- 'topologyKey': 'kubernetes.io/hostname',
- 'labelSelector': {
- 'matchExpressions': [
- {
- 'key': 'app',
- 'operator': 'In',
- 'values': ['airflow']
- }
- ]
- }
- }
- ]
- }
- }
-
- tolerations_config = [
- {
- 'key': 'dedicated',
- 'operator': 'Equal',
- 'value': 'airflow'
- },
- {
- 'key': 'prod',
- 'operator': 'Exists'
- }
- ]
-
- def setUp(self):
- if AirflowKubernetesScheduler is None:
- self.skipTest("kubernetes python package is not installed")
-
- self.resources = mock.patch(
- 'airflow.kubernetes.worker_configuration.Resources'
- )
-
- for patcher in [self.resources]:
- self.mock_foo = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.kube_config = mock.MagicMock()
- self.kube_config.airflow_home = '/'
- self.kube_config.airflow_dags = 'dags'
- self.kube_config.airflow_dags = 'logs'
- self.kube_config.dags_volume_subpath = None
- self.kube_config.logs_volume_subpath = None
- self.kube_config.dags_in_image = False
- self.kube_config.dags_folder = None
- self.kube_config.git_dags_folder_mount_point = None
- self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}
-
- def tearDown(self):
- self.kube_config = None
-
- def test_worker_configuration_no_subpaths(self):
- worker_config = WorkerConfiguration(self.kube_config)
- volumes, volume_mounts = worker_config._get_volumes_and_mounts()
- volumes_list = [value for value in volumes.values()]
- volume_mounts_list = [value for value in volume_mounts.values()]
- for volume_or_mount in volumes_list + volume_mounts_list:
- if volume_or_mount['name'] not in ['airflow-config', 'airflow-local-settings']:
- self.assertNotIn(
- 'subPath', volume_or_mount,
- "subPath shouldn't be defined"
- )
-
- @conf_vars({
- ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
- ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
- ('kubernetes', 'git_user'): 'some-user',
- ('kubernetes', 'git_password'): 'some-password',
- ('kubernetes', 'git_repo'): 'git@github.com:apache/airflow.git',
- ('kubernetes', 'git_branch'): 'master',
- ('kubernetes', 'git_dags_folder_mount_point'): '/usr/local/airflow/dags',
- ('kubernetes', 'delete_worker_pods'): 'True',
- ('kubernetes', 'kube_client_request_args'): '{"_request_timeout" : [60,360]}',
- })
- def test_worker_configuration_auth_both_ssh_and_user(self):
- with self.assertRaisesRegex(AirflowConfigException,
- 'either `git_user` and `git_password`.*'
- 'or `git_ssh_key_secret_name`.*'
- 'but not both$'):
- KubeConfig()
-
- def test_worker_with_subpaths(self):
- self.kube_config.dags_volume_subpath = 'dags'
- self.kube_config.logs_volume_subpath = 'logs'
- worker_config = WorkerConfiguration(self.kube_config)
- volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
- for volume in [value for value in volumes.values()]:
- self.assertNotIn(
- 'subPath', volume,
- "subPath isn't valid configuration for a volume"
- )
-
- for volume_mount in [value for value in volume_mounts.values()]:
- if volume_mount['name'] != 'airflow-config':
- self.assertIn(
- 'subPath', volume_mount,
- "subPath should've been passed to volumeMount configuration"
- )
-
- def test_worker_generate_dag_volume_mount_path(self):
- self.kube_config.git_dags_folder_mount_point = '/root/airflow/git/dags'
- self.kube_config.dags_folder = '/root/airflow/dags'
- worker_config = WorkerConfiguration(self.kube_config)
-
- self.kube_config.dags_volume_claim = 'airflow-dags'
- self.kube_config.dags_volume_host = ''
- dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
- self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
-
- self.kube_config.dags_volume_claim = ''
- self.kube_config.dags_volume_host = '/host/airflow/dags'
- dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
- self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
-
- self.kube_config.dags_volume_claim = ''
- self.kube_config.dags_volume_host = ''
- dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
- self.assertEqual(dag_volume_mount_path,
- self.kube_config.git_dags_folder_mount_point)
-
- def test_worker_environment_no_dags_folder(self):
- self.kube_config.airflow_configmap = ''
- self.kube_config.git_dags_folder_mount_point = ''
- self.kube_config.dags_folder = ''
- worker_config = WorkerConfiguration(self.kube_config)
- env = worker_config._get_environment()
-
- self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
-
- def test_worker_environment_when_dags_folder_specified(self):
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_dags_folder_mount_point = ''
- dags_folder = '/workers/path/to/dags'
- self.kube_config.dags_folder = dags_folder
-
- worker_config = WorkerConfiguration(self.kube_config)
- env = worker_config._get_environment()
-
- self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
-
- def test_worker_environment_dags_folder_using_git_sync(self):
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_sync_dest = 'repo'
- self.kube_config.git_subpath = 'dags'
- self.kube_config.git_dags_folder_mount_point = '/workers/path/to/dags'
-
- dags_folder = '{}/{}/{}'.format(self.kube_config.git_dags_folder_mount_point,
- self.kube_config.git_sync_dest,
- self.kube_config.git_subpath)
-
- worker_config = WorkerConfiguration(self.kube_config)
- env = worker_config._get_environment()
-
- self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
-
- def test_init_environment_using_git_sync_ssh_without_known_hosts(self):
- # Tests the init environment created with git-sync SSH authentication option is correct
- # without known hosts file
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_ssh_secret_name = 'airflow-secrets'
- self.kube_config.git_ssh_known_hosts_configmap_name = None
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
-
- worker_config = WorkerConfiguration(self.kube_config)
- init_containers = worker_config._get_init_containers()
-
- self.assertTrue(init_containers) # check not empty
- env = init_containers[0]['env']
-
- self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
- self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'false'} in env)
- self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
- def test_init_environment_using_git_sync_ssh_with_known_hosts(self):
- # Tests the init environment created with git-sync SSH authentication option is correct
- # with known hosts file
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
- self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
-
- worker_config = WorkerConfiguration(self.kube_config)
- init_containers = worker_config._get_init_containers()
-
- self.assertTrue(init_containers) # check not empty
- env = init_containers[0]['env']
-
- self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
- self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
- self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
- 'value': '/etc/git-secret/known_hosts'} in env)
- self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
- def test_init_environment_using_git_sync_user_without_known_hosts(self):
- # Tests the init environment created with git-sync User authentication option is correct
- # without known hosts file
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_user = 'git_user'
- self.kube_config.git_password = 'git_password'
- self.kube_config.git_ssh_known_hosts_configmap_name = None
- self.kube_config.git_ssh_key_secret_name = None
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
-
- worker_config = WorkerConfiguration(self.kube_config)
- init_containers = worker_config._get_init_containers()
-
- self.assertTrue(init_containers) # check not empty
- env = init_containers[0]['env']
-
- self.assertFalse({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
- self.assertTrue({'name': 'GIT_SYNC_USERNAME', 'value': 'git_user'} in env)
- self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
- self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'false'} in env)
- self.assertFalse({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
- 'value': '/etc/git-secret/known_hosts'} in env)
- self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
- def test_init_environment_using_git_sync_user_with_known_hosts(self):
- # Tests the init environment created with git-sync User authentication option is correct
- # with known hosts file
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_user = 'git_user'
- self.kube_config.git_password = 'git_password'
- self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
- self.kube_config.git_ssh_key_secret_name = None
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
-
- worker_config = WorkerConfiguration(self.kube_config)
- init_containers = worker_config._get_init_containers()
-
- self.assertTrue(init_containers) # check not empty
- env = init_containers[0]['env']
-
- self.assertFalse({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
- self.assertTrue({'name': 'GIT_SYNC_USERNAME', 'value': 'git_user'} in env)
- self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
- self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
- self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
- 'value': '/etc/git-secret/known_hosts'} in env)
- self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
- def test_make_pod_git_sync_credentials_secret(self):
- # Tests the pod created with git_sync_credentials_secret will get into the init container
- self.kube_config.git_sync_credentials_secret = 'airflow-git-creds-secret'
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
- self.kube_config.worker_fs_group = None
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[])
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- username_env = {
- 'name': 'GIT_SYNC_USERNAME',
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': self.kube_config.git_sync_credentials_secret,
- 'key': 'GIT_SYNC_USERNAME'
- }
- }
- }
- password_env = {
- 'name': 'GIT_SYNC_PASSWORD',
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': self.kube_config.git_sync_credentials_secret,
- 'key': 'GIT_SYNC_PASSWORD'
- }
- }
- }
-
- self.assertIn(username_env, pod.init_containers[0]["env"],
- 'The username env for git credentials did not get into the init container')
-
- self.assertIn(password_env, pod.init_containers[0]["env"],
- 'The password env for git credentials did not get into the init container')
-
- def test_make_pod_git_sync_rev(self):
- # Tests the pod created with git_sync_credentials_secret will get into the init container
- self.kube_config.git_sync_rev = 'sampletag'
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
- self.kube_config.worker_fs_group = None
- self.kube_config.git_dags_folder_mount_point = 'dags'
- self.kube_config.git_sync_dest = 'repo'
- self.kube_config.git_subpath = 'path'
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[])
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- rev_env = {
- 'name': 'GIT_SYNC_REV',
- 'value': self.kube_config.git_sync_rev
- }
-
- self.assertIn(rev_env, pod.init_containers[0]["env"],
- 'The git_sync_rev env did not get into the init container')
-
- def test_init_environment_using_git_sync_run_as_user_empty(self):
- # Tests if git_syn_run_as_user is none, then no securityContext created in init container
-
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
- self.kube_config.git_sync_run_as_user = ''
-
- worker_config = WorkerConfiguration(self.kube_config)
- init_containers = worker_config._get_init_containers()
- self.assertTrue(init_containers) # check not empty
-
- self.assertNotIn(
- 'securityContext', init_containers[0],
- "securityContext shouldn't be defined"
- )
-
- def test_make_pod_run_as_user_0(self):
- # Tests the pod created with run-as-user 0 actually gets that in it's config
- self.kube_config.worker_run_as_user = 0
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
- self.kube_config.worker_fs_group = None
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[])
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- self.assertEqual(0, pod.security_context['runAsUser'])
-
- def test_make_pod_assert_labels(self):
- # Tests the pod created has all the expected labels set
- self.kube_config.dags_folder = 'dags'
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
-
- volume_mounts=[])
- pod = worker_config.make_pod("default", "sample-uuid", "test_pod_id", "test_dag_id",
- "test_task_id", "2019-11-21 11:08:22.920875", 1, "bash -c 'ls /'",
- kube_executor_config)
- expected_labels = {
- 'airflow-worker': 'sample-uuid',
- 'airflow_version': airflow_version.replace('+', '-'),
- 'dag_id': 'test_dag_id',
- 'execution_date': '2019-11-21 11:08:22.920875',
- 'kubernetes_executor': 'True',
- 'my_label': 'label_id',
- 'task_id': 'test_task_id',
- 'try_number': '1'
- }
- self.assertEqual(pod.labels, expected_labels)
-
- def test_make_pod_git_sync_ssh_without_known_hosts(self):
- # Tests the pod created with git-sync SSH authentication option is correct without known hosts
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
- self.kube_config.worker_fs_group = None
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[])
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- init_containers = worker_config._get_init_containers()
- git_ssh_key_file = next((x['value'] for x in init_containers[0]['env']
- if x['name'] == 'GIT_SSH_KEY_FILE'), None)
- volume_mount_ssh_key = next((x['mountPath'] for x in init_containers[0]['volumeMounts']
- if x['name'] == worker_config.git_sync_ssh_secret_volume_name),
- None)
- self.assertTrue(git_ssh_key_file)
- self.assertTrue(volume_mount_ssh_key)
- self.assertEqual(65533, pod.security_context['fsGroup'])
- self.assertEqual(git_ssh_key_file,
- volume_mount_ssh_key,
- 'The location where the git ssh secret is mounted'
- ' needs to be the same as the GIT_SSH_KEY_FILE path')
-
- def test_make_pod_git_sync_ssh_with_known_hosts(self):
- # Tests the pod created with git-sync SSH authentication option is correct with known hosts
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.git_ssh_secret_name = 'airflow-secrets'
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
-
- worker_config = WorkerConfiguration(self.kube_config)
-
- init_containers = worker_config._get_init_containers()
- git_ssh_known_hosts_file = next((x['value'] for x in init_containers[0]['env']
- if x['name'] == 'GIT_SSH_KNOWN_HOSTS_FILE'), None)
-
- volume_mount_ssh_known_hosts_file = next(
- (x['mountPath'] for x in init_containers[0]['volumeMounts']
- if x['name'] == worker_config.git_sync_ssh_known_hosts_volume_name),
- None)
- self.assertTrue(git_ssh_known_hosts_file)
- self.assertTrue(volume_mount_ssh_known_hosts_file)
- self.assertEqual(git_ssh_known_hosts_file,
- volume_mount_ssh_known_hosts_file,
- 'The location where the git known hosts file is mounted'
- ' needs to be the same as the GIT_SSH_KNOWN_HOSTS_FILE path')
-
- def test_make_pod_with_empty_executor_config(self):
- self.kube_config.kube_affinity = self.affinity_config
- self.kube_config.kube_tolerations = self.tolerations_config
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[]
- )
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
- self.assertEqual('app',
- pod.affinity['podAntiAffinity']
- ['requiredDuringSchedulingIgnoredDuringExecution'][0]
- ['labelSelector']
- ['matchExpressions'][0]
- ['key'])
-
- self.assertEqual(2, len(pod.tolerations))
- self.assertEqual('prod', pod.tolerations[1]['key'])
-
- def test_make_pod_with_executor_config(self):
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config,
- tolerations=self.tolerations_config,
- annotations=[],
- volumes=[],
- volume_mounts=[]
- )
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
- self.assertEqual('app',
- pod.affinity['podAntiAffinity']
- ['requiredDuringSchedulingIgnoredDuringExecution'][0]
- ['labelSelector']
- ['matchExpressions'][0]
- ['key'])
-
- self.assertEqual(2, len(pod.tolerations))
- self.assertEqual('prod', pod.tolerations[1]['key'])
-
- def test_worker_pvc_dags(self):
- # Tests persistence volume config created when `dags_volume_claim` is set
- self.kube_config.dags_volume_claim = 'airflow-dags'
-
- worker_config = WorkerConfiguration(self.kube_config)
- volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
- init_containers = worker_config._get_init_containers()
-
- dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
- dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
- self.assertEqual('airflow-dags', dag_volume[0]['persistentVolumeClaim']['claimName'])
- self.assertEqual(1, len(dag_volume_mount))
- self.assertTrue(dag_volume_mount[0]['readOnly'])
- self.assertEqual(0, len(init_containers))
-
- def test_worker_git_dags(self):
- # Tests persistence volume config created when `git_repo` is set
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_folder = '/usr/local/airflow/dags'
- self.kube_config.worker_dags_folder = '/usr/local/airflow/dags'
-
- self.kube_config.git_sync_container_repository = 'gcr.io/google-containers/git-sync-amd64'
- self.kube_config.git_sync_container_tag = 'v2.0.5'
- self.kube_config.git_sync_container = 'gcr.io/google-containers/git-sync-amd64:v2.0.5'
- self.kube_config.git_sync_init_container_name = 'git-sync-clone'
- self.kube_config.git_subpath = 'dags_folder'
- self.kube_config.git_sync_root = '/git'
- self.kube_config.git_sync_run_as_user = 65533
- self.kube_config.git_dags_folder_mount_point = '/usr/local/airflow/dags/repo/dags_folder'
-
- worker_config = WorkerConfiguration(self.kube_config)
- volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
- dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
- dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
- self.assertTrue('emptyDir' in dag_volume[0])
- self.assertEqual(self.kube_config.git_dags_folder_mount_point, dag_volume_mount[0]['mountPath'])
- self.assertTrue(dag_volume_mount[0]['readOnly'])
-
- init_container = worker_config._get_init_containers()[0]
- init_container_volume_mount = [mount for mount in init_container['volumeMounts']
- if mount['name'] == 'airflow-dags']
-
- self.assertEqual('git-sync-clone', init_container['name'])
- self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', init_container['image'])
- self.assertEqual(1, len(init_container_volume_mount))
- self.assertFalse(init_container_volume_mount[0]['readOnly'])
- self.assertEqual(65533, init_container['securityContext']['runAsUser'])
-
- def test_worker_container_dags(self):
- # Tests that the 'airflow-dags' persistence volume is NOT created when `dags_in_image` is set
- self.kube_config.dags_in_image = True
-
- worker_config = WorkerConfiguration(self.kube_config)
- volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
- dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
- dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
- init_containers = worker_config._get_init_containers()
-
- self.assertEqual(0, len(dag_volume))
- self.assertEqual(0, len(dag_volume_mount))
- self.assertEqual(0, len(init_containers))
-
- def test_set_airflow_config_configmap(self):
- """
- Test that airflow.cfg can be set via configmap by
- checking volume & volume-mounts are set correctly.
- """
- self.kube_config.airflow_home = '/usr/local/airflow'
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.airflow_local_settings_configmap = None
- self.kube_config.dags_folder = '/workers/path/to/dags'
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[])
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- airflow_config_volume = [
- volume for volume in pod.volumes if volume["name"] == 'airflow-config'
- ]
- # Test that volume_name is found
- self.assertEqual(1, len(airflow_config_volume))
-
- # Test that config map exists
- self.assertEqual(
- {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
- airflow_config_volume[0]
- )
-
- # Test that only 1 Volume Mounts exists with 'airflow-config' name
- # One for airflow.cfg
- volume_mounts = [
- volume_mount for volume_mount in pod.volume_mounts
- if volume_mount['name'] == 'airflow-config'
- ]
-
- self.assertEqual([
- {
- 'mountPath': '/usr/local/airflow/airflow.cfg',
- 'name': 'airflow-config',
- 'readOnly': True,
- 'subPath': 'airflow.cfg',
- }
- ],
- volume_mounts
- )
-
- def test_set_airflow_local_settings_configmap(self):
- """
- Test that airflow_local_settings.py can be set via configmap by
- checking volume & volume-mounts are set correctly.
- """
- self.kube_config.airflow_home = '/usr/local/airflow'
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.airflow_local_settings_configmap = 'airflow-configmap'
- self.kube_config.dags_folder = '/workers/path/to/dags'
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[])
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- airflow_config_volume = [
- volume for volume in pod.volumes if volume["name"] == 'airflow-config'
- ]
- # Test that volume_name is found
- self.assertEqual(1, len(airflow_config_volume))
-
- # Test that config map exists
- self.assertEqual(
- {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
- airflow_config_volume[0]
- )
-
- # Test that 2 Volume Mounts exists and has 2 different mount-paths
- # One for airflow.cfg
- # Second for airflow_local_settings.py
- volume_mounts = [
- volume_mount for volume_mount in pod.volume_mounts
- if volume_mount['name'] == 'airflow-config'
- ]
- self.assertEqual(2, len(volume_mounts))
-
- six.assertCountEqual(
- self,
- [
- {
- 'mountPath': '/usr/local/airflow/airflow.cfg',
- 'name': 'airflow-config',
- 'readOnly': True,
- 'subPath': 'airflow.cfg',
- },
- {
- 'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
- 'name': 'airflow-config',
- 'readOnly': True,
- 'subPath': 'airflow_local_settings.py',
- }
- ],
- volume_mounts
- )
-
- def test_set_airflow_configmap_different_for_local_setting(self):
- """
- Test that airflow_local_settings.py can be set via configmap by
- checking volume & volume-mounts are set correctly.
- """
- self.kube_config.airflow_home = '/usr/local/airflow'
- self.kube_config.airflow_configmap = 'airflow-configmap'
- self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap'
- self.kube_config.dags_folder = '/workers/path/to/dags'
-
- worker_config = WorkerConfiguration(self.kube_config)
- kube_executor_config = KubernetesExecutorConfig(annotations=[],
- volumes=[],
- volume_mounts=[])
-
- pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
- "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
- kube_executor_config)
-
- airflow_local_settings_volume = [
- volume for volume in pod.volumes if volume["name"] == 'airflow-local-settings'
- ]
- # Test that volume_name is found
- self.assertEqual(1, len(airflow_local_settings_volume))
-
- # Test that config map exists
- self.assertEqual(
- [{'configMap': {'name': 'airflow-ls-configmap'}, 'name': 'airflow-local-settings'}],
- airflow_local_settings_volume
- )
-
- # Test that 2 Volume Mounts exists and has 2 different mount-paths
- # One for airflow.cfg
- # Second for airflow_local_settings.py
- airflow_cfg_volume_mount = [
- volume_mount for volume_mount in pod.volume_mounts
- if volume_mount['name'] == 'airflow-config'
- ]
-
- local_setting_volume_mount = [
- volume_mount for volume_mount in pod.volume_mounts
- if volume_mount['name'] == 'airflow-local-settings'
- ]
- self.assertEqual(1, len(airflow_cfg_volume_mount))
- self.assertEqual(1, len(local_setting_volume_mount))
-
- self.assertEqual(
- [
- {
- 'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
- 'name': 'airflow-local-settings',
- 'readOnly': True,
- 'subPath': 'airflow_local_settings.py',
- }
- ],
- local_setting_volume_mount
- )
-
- print(airflow_cfg_volume_mount)
-
- self.assertEqual(
- [
- {
- 'mountPath': '/usr/local/airflow/airflow.cfg',
- 'name': 'airflow-config',
- 'readOnly': True,
- 'subPath': 'airflow.cfg',
- }
- ],
- airflow_cfg_volume_mount
- )
-
- def test_kubernetes_environment_variables(self):
- # Tests the kubernetes environment variables get copied into the worker pods
- input_environment = {
- 'ENVIRONMENT': 'prod',
- 'LOG_LEVEL': 'warning'
- }
- self.kube_config.kube_env_vars = input_environment
- worker_config = WorkerConfiguration(self.kube_config)
- env = worker_config._get_environment()
- for key in input_environment:
- self.assertIn(key, env)
- self.assertIn(input_environment[key], env.values())
-
- core_executor = 'AIRFLOW__CORE__EXECUTOR'
- input_environment = {
- core_executor: 'NotLocalExecutor'
- }
- self.kube_config.kube_env_vars = input_environment
- worker_config = WorkerConfiguration(self.kube_config)
- env = worker_config._get_environment()
- self.assertEqual(env[core_executor], 'LocalExecutor')
-
- def test_kubernetes_environment_variables_for_init_container(self):
- self.kube_config.dags_volume_claim = None
- self.kube_config.dags_volume_host = None
- self.kube_config.dags_in_image = None
-
- # Tests the kubernetes environment variables get copied into the worker pods
- input_environment = {
- 'ENVIRONMENT': 'prod',
- 'LOG_LEVEL': 'warning'
- }
- self.kube_config.kube_env_vars = input_environment
- worker_config = WorkerConfiguration(self.kube_config)
- env = worker_config._get_environment()
- for key in input_environment:
- self.assertIn(key, env)
- self.assertIn(input_environment[key], env.values())
-
- init_containers = worker_config._get_init_containers()
-
- self.assertTrue(init_containers) # check not empty
- env = init_containers[0]['env']
-
- self.assertTrue({'name': 'ENVIRONMENT', 'value': 'prod'} in env)
- self.assertTrue({'name': 'LOG_LEVEL', 'value': 'warning'} in env)
-
- def test_get_secrets(self):
- # Test when secretRef is None and kube_secrets is not empty
- self.kube_config.kube_secrets = {
- 'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key',
- 'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials'
- }
- self.kube_config.env_from_secret_ref = None
- worker_config = WorkerConfiguration(self.kube_config)
- secrets = worker_config._get_secrets()
- secrets.sort(key=lambda secret: secret.deploy_target)
- expected = [
- Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'),
- Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials')
- ]
- self.assertListEqual(expected, secrets)
-
- # Test when secret is not empty and kube_secrets is empty dict
- self.kube_config.kube_secrets = {}
- self.kube_config.env_from_secret_ref = 'secret_a,secret_b'
- worker_config = WorkerConfiguration(self.kube_config)
- secrets = worker_config._get_secrets()
- expected = [
- Secret('env', None, 'secret_a'),
- Secret('env', None, 'secret_b')
- ]
- self.assertListEqual(expected, secrets)
-
- def test_get_configmaps(self):
- # Test when configmap is empty
- self.kube_config.env_from_configmap_ref = ''
- worker_config = WorkerConfiguration(self.kube_config)
- configmaps = worker_config._get_configmaps()
- self.assertListEqual([], configmaps)
-
- # test when configmap is not empty
- self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b'
- worker_config = WorkerConfiguration(self.kube_config)
- configmaps = worker_config._get_configmaps()
- self.assertListEqual(['configmap_a', 'configmap_b'], configmaps)
-
- def test_get_labels(self):
- worker_config = WorkerConfiguration(self.kube_config)
- labels = worker_config._get_labels({'my_kube_executor_label': 'kubernetes'}, {
- 'dag_id': 'override_dag_id',
- })
- self.assertEqual({
- 'my_label': 'label_id',
- 'dag_id': 'override_dag_id',
- 'my_kube_executor_label': 'kubernetes',
- }, labels)
-
-
class TestKubernetesExecutor(unittest.TestCase):
"""
Tests if an ApiException from the Kube Client will cause the task to
@@ -1001,6 +145,9 @@ class TestKubernetesExecutor(unittest.TestCase):
mock_kube_client.create_namespaced_pod = mock.MagicMock(
side_effect=ApiException(http_resp=r))
mock_get_kube_client.return_value = mock_kube_client
+ mock_api_client = mock.MagicMock()
+ mock_api_client.sanitize_for_serialization.return_value = {}
+ mock_kube_client.api_client = mock_api_client
kubernetesExecutor = KubernetesExecutor()
kubernetesExecutor.start()
@@ -1045,10 +192,12 @@ class TestKubernetesExecutor(unittest.TestCase):
executor._change_state(key, State.RUNNING, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.RUNNING)
+ @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
- def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
+ def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
+ mock_kube_config):
executor = KubernetesExecutor()
executor.start()
test_time = timezone.utcnow()
@@ -1057,11 +206,12 @@ class TestKubernetesExecutor(unittest.TestCase):
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_called_once_with('pod_id', 'default')
+ @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
- def test_change_state_failed_no_deletion(self, mock_delete_pod, mock_get_kube_client,
- mock_kubernetes_job_watcher):
+ def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
+ mock_kube_config):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_failure = False
@@ -1072,11 +222,12 @@ class TestKubernetesExecutor(unittest.TestCase):
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_not_called()
+ @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
- mock_kubernetes_job_watcher):
+ mock_kubernetes_job_watcher, mock_kube_config):
test_time = timezone.utcnow()
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
@@ -1087,11 +238,12 @@ class TestKubernetesExecutor(unittest.TestCase):
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_not_called()
+ @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
- mock_kubernetes_job_watcher):
+ mock_kubernetes_job_watcher, mock_kube_config):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods_on_failure = True
diff --git a/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
deleted file mode 100644
index 87d0073..0000000
--- a/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
+++ /dev/null
@@ -1,396 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory
-from airflow.kubernetes.pod import Pod, Resources
-from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
-from airflow.kubernetes.secret import Secret
-from parameterized import parameterized
-import unittest
-import copy
-
-
-class TestKubernetesRequestFactory(unittest.TestCase):
-
- def setUp(self):
-
- self.expected = {
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {
- 'name': 'name'
- },
- 'spec': {
- 'containers': [{
- 'name': 'base',
- 'image': 'airflow-worker:latest',
- 'command': [
- "/usr/local/airflow/entrypoint.sh",
- "/bin/bash sleep 25"
- ],
- }],
- 'restartPolicy': 'Never'
- }
- }
- self.input_req = copy.deepcopy(self.expected)
-
- def test_extract_image(self):
- image = 'v3.14'
- pod = Pod(image, {}, [])
- KubernetesRequestFactory.extract_image(pod, self.input_req)
- self.expected['spec']['containers'][0]['image'] = image
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_image_pull_policy(self):
- # Test when pull policy is not none
- pull_policy = 'IfNotPresent'
- pod = Pod('v3.14', {}, [], image_pull_policy=pull_policy)
-
- KubernetesRequestFactory.extract_image_pull_policy(pod, self.input_req)
- self.expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy
- self.assertEqual(self.input_req, self.expected)
-
- def test_add_secret_to_env(self):
- secret = Secret('env', 'target', 'my-secret', 'KEY')
- secret_list = []
- self.expected = [{
- 'name': 'TARGET',
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': 'my-secret',
- 'key': 'KEY'
- }
- }
- }]
- KubernetesRequestFactory.add_secret_to_env(secret_list, secret)
- self.assertListEqual(secret_list, self.expected)
-
- def test_extract_labels(self):
- # Test when labels are not empty
- labels = {'label_a': 'val_a', 'label_b': 'val_b'}
- pod = Pod('v3.14', {}, [], labels=labels)
- self.expected['metadata']['labels'] = labels
- KubernetesRequestFactory.extract_labels(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_annotations(self):
- # Test when annotations are not empty
- annotations = {'annot_a': 'val_a', 'annot_b': 'val_b'}
- pod = Pod('v3.14', {}, [], annotations=annotations)
- self.expected['metadata']['annotations'] = annotations
- KubernetesRequestFactory.extract_annotations(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_affinity(self):
- # Test when affinity is not empty
- affinity = {'podAffinity': 'requiredDuringSchedulingIgnoredDuringExecution'}
- pod = Pod('v3.14', {}, [], affinity=affinity)
- self.expected['spec']['affinity'] = affinity
- KubernetesRequestFactory.extract_affinity(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_node_selector(self):
- # Test when affinity is not empty
- node_selectors = {'disktype': 'ssd', 'accelerator': 'nvidia-tesla-p100'}
- pod = Pod('v3.14', {}, [], node_selectors=node_selectors)
- self.expected['spec']['nodeSelector'] = node_selectors
- KubernetesRequestFactory.extract_node_selector(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_cmds(self):
- cmds = ['test-cmd.sh']
- pod = Pod('v3.14', {}, cmds)
- KubernetesRequestFactory.extract_cmds(pod, self.input_req)
- self.expected['spec']['containers'][0]['command'] = cmds
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_args(self):
- args = ['test_arg.sh']
- pod = Pod('v3.14', {}, [], args=args)
- KubernetesRequestFactory.extract_args(pod, self.input_req)
- self.expected['spec']['containers'][0]['args'] = args
- self.assertEqual(self.input_req, self.expected)
-
- def test_attach_volumes(self):
- # Test when volumes is not empty list
- volumes = ['vol_a', 'vol_b']
- pod = Pod('v3.14', {}, [], volumes=volumes)
- self.expected['spec']['volumes'] = volumes
- KubernetesRequestFactory.attach_volumes(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_attach_volume_mounts(self):
- # Test when volumes is not empty list
- volume_mounts = ['vol_a', 'vol_b']
- pod = Pod('v3.14', {}, [], volume_mounts=volume_mounts)
- self.expected['spec']['containers'][0]['volumeMounts'] = volume_mounts
- KubernetesRequestFactory.attach_volume_mounts(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_name(self):
- name = 'pod-name'
- pod = Pod('v3.14', {}, [], name=name)
- self.expected['metadata']['name'] = name
- KubernetesRequestFactory.extract_name(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_volume_secrets(self):
- # Test when secrets is not empty
- secrets = [
- Secret('volume', 'KEY1', 's1', 'key-1'),
- Secret('env', 'KEY2', 's2'),
- Secret('volume', 'KEY3', 's3', 'key-2')
- ]
- pod = Pod('v3.14', {}, [], secrets=secrets)
- self.expected['spec']['containers'][0]['volumeMounts'] = [{
- 'mountPath': 'KEY1',
- 'name': 'secretvol0',
- 'readOnly': True
- }, {
- 'mountPath': 'KEY3',
- 'name': 'secretvol1',
- 'readOnly': True
- }]
- self.expected['spec']['volumes'] = [{
- 'name': 'secretvol0',
- 'secret': {
- 'secretName': 's1'
- }
- }, {
- 'name': 'secretvol1',
- 'secret': {
- 'secretName': 's3'
- }
- }]
- KubernetesRequestFactory.extract_volume_secrets(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_env_and_secrets(self):
- # Test when secrets and envs are not empty
- secrets = [
- Secret('env', None, 's1'),
- Secret('volume', 'KEY2', 's2', 'key-2'),
- Secret('env', None, 's3')
- ]
- envs = {
- 'ENV1': 'val1',
- 'ENV2': 'val2'
- }
- configmaps = ['configmap_a', 'configmap_b']
- pod_runtime_envs = [PodRuntimeInfoEnv("ENV3", "status.podIP")]
- pod = Pod(
- image='v3.14',
- envs=envs,
- cmds=[],
- secrets=secrets,
- configmaps=configmaps,
- pod_runtime_info_envs=pod_runtime_envs)
- self.expected['spec']['containers'][0]['env'] = [
- {'name': 'ENV1', 'value': 'val1'},
- {'name': 'ENV2', 'value': 'val2'},
- {
- 'name': 'ENV3',
- 'valueFrom': {
- 'fieldRef': {
- 'fieldPath': 'status.podIP'
- }
- }
- }
- ]
- self.expected['spec']['containers'][0]['envFrom'] = [{
- 'secretRef': {
- 'name': 's1'
- }
- }, {
- 'secretRef': {
- 'name': 's3'
- }
- }, {
- 'configMapRef': {
- 'name': 'configmap_a'
- }
- }, {
- 'configMapRef': {
- 'name': 'configmap_b'
- }
- }]
-
- KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req)
- self.input_req['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_env_and_secret_order(self):
- envs = {
- 'ENV': 'val1',
- }
- pod_runtime_envs = [PodRuntimeInfoEnv('RUNTIME_ENV', 'status.podIP')]
- pod = Pod(
- image='v3.14',
- envs=envs,
- cmds=[],
- pod_runtime_info_envs=pod_runtime_envs)
- self.expected['spec']['containers'][0]['env'] = [
- {
- 'name': 'RUNTIME_ENV',
- 'valueFrom': {
- 'fieldRef': {
- 'fieldPath': 'status.podIP'
- }
- }
- },
- {'name': 'ENV', 'value': 'val1'}
- ]
- KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_resources(self):
- # Test when resources is not empty
- resources = Resources(
- request_memory='1Gi',
- request_cpu=1,
- limit_memory='2Gi',
- limit_cpu=2)
-
- pod = Pod('v3.14', {}, [], resources=resources)
- self.expected['spec']['containers'][0]['resources'] = {
- 'requests': {
- 'memory': '1Gi',
- 'cpu': 1
- },
- 'limits': {
- 'memory': '2Gi',
- 'cpu': 2
- },
- }
- KubernetesRequestFactory.extract_resources(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_display_resources(self):
- resources_string = str(Resources('1Gi', 1))
- self.assertEqual(
- resources_string,
- "Request: [cpu: 1, memory: 1Gi], Limit: [cpu: None, memory: None, gpu: None]")
-
- def test_extract_limits(self):
- # Test when resources is not empty
- resources = Resources(
- limit_memory='1Gi',
- limit_cpu=1)
-
- pod = Pod('v3.14', {}, [], resources=resources)
- self.expected['spec']['containers'][0]['resources'] = {
- 'limits': {
- 'memory': '1Gi',
- 'cpu': 1
- }
- }
- KubernetesRequestFactory.extract_resources(pod, self.input_req)
- self.assertEqual(self.expected, self.input_req)
-
- def test_extract_all_resources(self):
- # Test when resources is not empty
- resources = Resources(
- request_memory='1Gi',
- request_cpu=1,
- limit_memory='2Gi',
- limit_cpu=2,
- limit_gpu=3)
-
- pod = Pod('v3.14', {}, [], resources=resources)
- self.expected['spec']['containers'][0]['resources'] = {
- 'requests': {
- 'memory': '1Gi',
- 'cpu': 1
- },
- 'limits': {
- 'memory': '2Gi',
- 'cpu': 2,
- 'nvidia.com/gpu': 3
- }
- }
- KubernetesRequestFactory.extract_resources(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_init_containers(self):
- init_container = 'init_container'
- pod = Pod('v3.14', {}, [], init_containers=init_container)
- self.expected['spec']['initContainers'] = init_container
- KubernetesRequestFactory.extract_init_containers(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_service_account_name(self):
- service_account_name = 'service_account_name'
- pod = Pod('v3.14', {}, [], service_account_name=service_account_name)
- self.expected['spec']['serviceAccountName'] = service_account_name
- KubernetesRequestFactory.extract_service_account_name(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_hostnetwork(self):
- hostnetwork = True
- pod = Pod('v3.14', {}, [], hostnetwork=hostnetwork)
- self.expected['spec']['hostNetwork'] = hostnetwork
- KubernetesRequestFactory.extract_hostnetwork(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_image_pull_secrets(self):
- image_pull_secrets = 'secret_a,secret_b,secret_c'
- pod = Pod('v3.14', {}, [], image_pull_secrets=image_pull_secrets)
- self.expected['spec']['imagePullSecrets'] = [
- {'name': 'secret_a'},
- {'name': 'secret_b'},
- {'name': 'secret_c'},
- ]
- KubernetesRequestFactory.extract_image_pull_secrets(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_extract_tolerations(self):
- tolerations = [{
- 'key': 'key',
- 'operator': 'Equal',
- 'value': 'value',
- 'effect': 'NoSchedule'
- }]
- pod = Pod('v3.14', {}, [], tolerations=tolerations)
- self.expected['spec']['tolerations'] = tolerations
- KubernetesRequestFactory.extract_tolerations(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- def test_security_context(self):
- security_context = {
- 'runAsUser': 1000,
- 'fsGroup': 2000
- }
- pod = Pod('v3.14', {}, [], security_context=security_context)
- self.expected['spec']['securityContext'] = security_context
- KubernetesRequestFactory.extract_security_context(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
-
- @parameterized.expand([
- 'extract_resources',
- 'extract_init_containers',
- 'extract_service_account_name',
- 'extract_hostnetwork',
- 'extract_image_pull_secrets',
- 'extract_tolerations',
- 'extract_security_context',
- 'extract_volume_secrets'
- ])
- def test_identity(self, name):
- kube_request_factory_func = getattr(KubernetesRequestFactory, name)
- pod = Pod('v3.14', {}, [])
- kube_request_factory_func(pod, self.input_req)
- self.assertEqual(self.input_req, self.expected)
diff --git a/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
deleted file mode 100644
index 68617de..0000000
--- a/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from airflow.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory, \
- ExtractXcomPodRequestFactory
-from airflow.kubernetes.pod import Pod, Resources
-from airflow.kubernetes.secret import Secret
-from airflow.exceptions import AirflowConfigException
-import unittest
-
-XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
-
-
-class TestPodRequestFactory(unittest.TestCase):
-
- def setUp(self):
- self.simple_pod_request_factory = SimplePodRequestFactory()
- self.xcom_pod_request_factory = ExtractXcomPodRequestFactory()
- self.pod = Pod(
- image='busybox',
- envs={
- 'ENVIRONMENT': 'prod',
- 'LOG_LEVEL': 'warning'
- },
- name='myapp-pod',
- cmds=['sh', '-c', 'echo Hello Kubernetes!'],
- labels={'app': 'myapp'},
- image_pull_secrets='pull_secret_a,pull_secret_b',
- configmaps=['configmap_a', 'configmap_b'],
- ports=[{'name': 'foo', 'containerPort': 1234}],
- resources=Resources('1Gi', 1, '2Gi', 2, 1),
- secrets=[
- # This should be a secretRef
- Secret('env', None, 'secret_a'),
- # This should be a single secret mounted in volumeMounts
- Secret('volume', '/etc/foo', 'secret_b'),
- # This should produce a single secret mounted in env
- Secret('env', 'TARGET', 'secret_b', 'source_b'),
- ],
- security_context={
- 'runAsUser': 1000,
- 'fsGroup': 2000,
- }
- )
- self.maxDiff = None
- self.expected = {
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {
- 'name': 'myapp-pod',
- 'labels': {'app': 'myapp'},
- 'annotations': {}},
- 'spec': {
- 'containers': [{
- 'name': 'base',
- 'image': 'busybox',
- 'command': [
- 'sh', '-c', 'echo Hello Kubernetes!'
- ],
- 'imagePullPolicy': 'IfNotPresent',
- 'args': [],
- 'env': [{
- 'name': 'ENVIRONMENT',
- 'value': 'prod'
- }, {
- 'name': 'LOG_LEVEL',
- 'value': 'warning'
- }, {
- 'name': 'TARGET',
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': 'secret_b',
- 'key': 'source_b'
- }
- }
- }],
- 'envFrom': [{
- 'secretRef': {
- 'name': 'secret_a'
- }
- }, {
- 'configMapRef': {
- 'name': 'configmap_a'
- }
- }, {
- 'configMapRef': {
- 'name': 'configmap_b'
- }
- }],
- 'resources': {
- 'requests': {
- 'memory': '1Gi',
- 'cpu': 1
- },
- 'limits': {
- 'memory': '2Gi',
- 'cpu': 2,
- 'nvidia.com/gpu': 1
- },
- },
- 'ports': [{'name': 'foo', 'containerPort': 1234}],
- 'volumeMounts': [{
- 'mountPath': '/etc/foo',
- 'name': 'secretvol0',
- 'readOnly': True
- }]
- }],
- 'restartPolicy': 'Never',
- 'nodeSelector': {},
- 'volumes': [{
- 'name': 'secretvol0',
- 'secret': {
- 'secretName': 'secret_b'
- }
- }],
- 'imagePullSecrets': [
- {'name': 'pull_secret_a'},
- {'name': 'pull_secret_b'}
- ],
- 'affinity': {},
- 'securityContext': {
- 'runAsUser': 1000,
- 'fsGroup': 2000,
- },
- }
- }
-
- def test_secret_throws(self):
- with self.assertRaises(AirflowConfigException):
- Secret('volume', None, 'secret_a', 'key')
-
- def test_simple_pod_request_factory_create(self):
- result = self.simple_pod_request_factory.create(self.pod)
- # sort
- result['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
- self.assertEqual(result, self.expected)
-
- def test_xcom_pod_request_factory_create(self):
- result = self.xcom_pod_request_factory.create(self.pod)
- container_two = {
- 'name': 'airflow-xcom-sidecar',
- 'image': 'alpine',
- 'command': ['sh', '-c', XCOM_CMD],
- 'volumeMounts': [
- {
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- }
- ],
- 'resources': {'requests': {'cpu': '1m'}},
- }
- self.expected['spec']['containers'].append(container_two)
- self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- })
- self.expected['spec']['volumes'].insert(0, {
- 'name': 'xcom', 'emptyDir': {}
- })
- result['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
- self.assertEqual(result, self.expected)
diff --git a/tests/kubernetes/kubernetes_request_factory/__init__.py b/tests/kubernetes/models/__init__.py
similarity index 100%
rename from tests/kubernetes/kubernetes_request_factory/__init__.py
rename to tests/kubernetes/models/__init__.py
diff --git a/tests/kubernetes/models/test_pod.py b/tests/kubernetes/models/test_pod.py
new file mode 100644
index 0000000..24314eb
--- /dev/null
+++ b/tests/kubernetes/models/test_pod.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+import unittest.mock as mock
+from kubernetes.client import ApiClient
+import kubernetes.client.models as k8s
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestPod(unittest.TestCase):
+
+ def test_port_to_k8s_client_obj(self):
+ port = Port('http', 80)
+ self.assertEqual(
+ port.to_k8s_client_obj(),
+ k8s.V1ContainerPort(
+ name='http',
+ container_port=80
+ )
+ )
+
+ @mock.patch('uuid.uuid4')
+ def test_port_attach_to_pod(self, mock_uuid):
+ mock_uuid.return_value = '0'
+ pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod()
+ ports = [
+ Port('https', 443),
+ Port('http', 80)
+ ]
+ k8s_client = ApiClient()
+ result = append_to_pod(pod, ports)
+ result = k8s_client.sanitize_for_serialization(result)
+ self.assertEqual({
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {'name': 'base-0'},
+ 'spec': {
+ 'containers': [{
+ 'args': [],
+ 'command': [],
+ 'env': [],
+ 'envFrom': [],
+ 'image': 'airflow-worker:latest',
+ 'imagePullPolicy': 'IfNotPresent',
+ 'name': 'base',
+ 'ports': [{
+ 'name': 'https',
+ 'containerPort': 443
+ }, {
+ 'name': 'http',
+ 'containerPort': 80
+ }],
+ 'volumeMounts': [],
+ }],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'restartPolicy': 'Never',
+ 'volumes': []
+ }
+ }, result)
diff --git a/tests/kubernetes/models/test_secret.py b/tests/kubernetes/models/test_secret.py
new file mode 100644
index 0000000..1cdc87a
--- /dev/null
+++ b/tests/kubernetes/models/test_secret.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+import unittest.mock as mock
+from kubernetes.client import ApiClient
+import kubernetes.client.models as k8s
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestSecret(unittest.TestCase):
+
+ def test_to_env_secret(self):
+ secret = Secret('env', 'name', 'secret', 'key')
+ self.assertEqual(secret.to_env_secret(), k8s.V1EnvVar(
+ name='NAME',
+ value_from=k8s.V1EnvVarSource(
+ secret_key_ref=k8s.V1SecretKeySelector(
+ name='secret',
+ key='key'
+ )
+ )
+ ))
+
+ def test_to_env_from_secret(self):
+ secret = Secret('env', None, 'secret')
+ self.assertEqual(secret.to_env_from_secret(), k8s.V1EnvFromSource(
+ secret_ref=k8s.V1SecretEnvSource(name='secret')
+ ))
+
+ @mock.patch('uuid.uuid4')
+ def test_to_volume_secret(self, mock_uuid):
+ mock_uuid.return_value = '0'
+ secret = Secret('volume', '/etc/foo', 'secret_b')
+ self.assertEqual(secret.to_volume_secret(), (
+ k8s.V1Volume(
+ name='secretvol0',
+ secret=k8s.V1SecretVolumeSource(
+ secret_name='secret_b'
+ )
+ ),
+ k8s.V1VolumeMount(
+ mount_path='/etc/foo',
+ name='secretvol0',
+ read_only=True
+ )
+ ))
+
+ @mock.patch('uuid.uuid4')
+ def test_attach_to_pod(self, mock_uuid):
+ mock_uuid.return_value = '0'
+ pod = PodGenerator(image='airflow-worker:latest',
+ name='base').gen_pod()
+ secrets = [
+ # This should be a secretRef
+ Secret('env', None, 'secret_a'),
+ # This should be a single secret mounted in volumeMounts
+ Secret('volume', '/etc/foo', 'secret_b'),
+ # This should produce a single secret mounted in env
+ Secret('env', 'TARGET', 'secret_b', 'source_b'),
+ ]
+ k8s_client = ApiClient()
+ result = append_to_pod(pod, secrets)
+ result = k8s_client.sanitize_for_serialization(result)
+ self.assertEqual(result, {
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {'name': 'base-0'},
+ 'spec': {
+ 'containers': [{
+ 'args': [],
+ 'command': [],
+ 'env': [{
+ 'name': 'TARGET',
+ 'valueFrom': {
+ 'secretKeyRef': {
+ 'key': 'source_b',
+ 'name': 'secret_b'
+ }
+ }
+ }],
+ 'envFrom': [{'secretRef': {'name': 'secret_a'}}],
+ 'image': 'airflow-worker:latest',
+ 'imagePullPolicy': 'IfNotPresent',
+ 'name': 'base',
+ 'ports': [],
+ 'volumeMounts': [{
+ 'mountPath': '/etc/foo',
+ 'name': 'secretvol0',
+ 'readOnly': True}]
+ }],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'restartPolicy': 'Never',
+ 'volumes': [{
+ 'name': 'secretvol0',
+ 'secret': {'secretName': 'secret_b'}
+ }]
+ }
+ })
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
new file mode 100644
index 0000000..afd5abe
--- /dev/null
+++ b/tests/kubernetes/test_pod_generator.py
@@ -0,0 +1,328 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import unittest.mock as mock
+import kubernetes.client.models as k8s
+from kubernetes.client import ApiClient
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.pod_generator import PodGenerator, PodDefaults
+from airflow.kubernetes.pod import Resources
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestPodGenerator(unittest.TestCase):
+
+ def setUp(self):
+ self.envs = {
+ 'ENVIRONMENT': 'prod',
+ 'LOG_LEVEL': 'warning'
+ }
+ self.secrets = [
+ # This should be a secretRef
+ Secret('env', None, 'secret_a'),
+ # This should be a single secret mounted in volumeMounts
+ Secret('volume', '/etc/foo', 'secret_b'),
+ # This should produce a single secret mounted in env
+ Secret('env', 'TARGET', 'secret_b', 'source_b'),
+ ]
+ self.resources = Resources('1Gi', 1, '2Gi', 2, 1)
+ self.k8s_client = ApiClient()
+ self.expected = {
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'name': 'myapp-pod-0',
+ 'labels': {'app': 'myapp'},
+ 'namespace': 'default'
+ },
+ 'spec': {
+ 'containers': [{
+ 'name': 'base',
+ 'image': 'busybox',
+ 'args': [],
+ 'command': [
+ 'sh', '-c', 'echo Hello Kubernetes!'
+ ],
+ 'imagePullPolicy': 'IfNotPresent',
+ 'env': [{
+ 'name': 'ENVIRONMENT',
+ 'value': 'prod'
+ }, {
+ 'name': 'LOG_LEVEL',
+ 'value': 'warning'
+ }, {
+ 'name': 'TARGET',
+ 'valueFrom': {
+ 'secretKeyRef': {
+ 'name': 'secret_b',
+ 'key': 'source_b'
+ }
+ }
+ }],
+ 'envFrom': [{
+ 'configMapRef': {
+ 'name': 'configmap_a'
+ }
+ }, {
+ 'configMapRef': {
+ 'name': 'configmap_b'
+ }
+ }, {
+ 'secretRef': {
+ 'name': 'secret_a'
+ }
+ }],
+ 'resources': {
+ 'requests': {
+ 'memory': '1Gi',
+ 'cpu': 1
+ },
+ 'limits': {
+ 'memory': '2Gi',
+ 'cpu': 2,
+ 'nvidia.com/gpu': 1
+ },
+ },
+ 'ports': [{'name': 'foo', 'containerPort': 1234}],
+ 'volumeMounts': [{
+ 'mountPath': '/etc/foo',
+ 'name': 'secretvol0',
+ 'readOnly': True
+ }]
+ }],
+ 'restartPolicy': 'Never',
+ 'volumes': [{
+ 'name': 'secretvol0',
+ 'secret': {
+ 'secretName': 'secret_b'
+ }
+ }],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [
+ {'name': 'pull_secret_a'},
+ {'name': 'pull_secret_b'}
+ ],
+ 'securityContext': {
+ 'runAsUser': 1000,
+ 'fsGroup': 2000,
+ },
+ }
+ }
+
+ @mock.patch('uuid.uuid4')
+ def test_gen_pod(self, mock_uuid):
+ mock_uuid.return_value = '0'
+ pod_generator = PodGenerator(
+ labels={'app': 'myapp'},
+ name='myapp-pod',
+ image_pull_secrets='pull_secret_a,pull_secret_b',
+ image='busybox',
+ envs=self.envs,
+ cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+ security_context=k8s.V1PodSecurityContext(
+ run_as_user=1000,
+ fs_group=2000,
+ ),
+ namespace='default',
+ ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
+ configmaps=['configmap_a', 'configmap_b']
+ )
+ result = pod_generator.gen_pod()
+ result = append_to_pod(result, self.secrets)
+ result = self.resources.attach_to_pod(result)
+ result_dict = self.k8s_client.sanitize_for_serialization(result)
+ # sort
+ result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+ result_dict['spec']['containers'][0]['envFrom'].sort(
+ key=lambda x: list(x.values())[0]['name']
+ )
+ self.assertDictEqual(result_dict, self.expected)
+
+ @mock.patch('uuid.uuid4')
+ def test_gen_pod_extract_xcom(self, mock_uuid):
+ mock_uuid.return_value = '0'
+ pod_generator = PodGenerator(
+ labels={'app': 'myapp'},
+ name='myapp-pod',
+ image_pull_secrets='pull_secret_a,pull_secret_b',
+ image='busybox',
+ envs=self.envs,
+ cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+ namespace='default',
+ security_context=k8s.V1PodSecurityContext(
+ run_as_user=1000,
+ fs_group=2000,
+ ),
+ ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
+ configmaps=['configmap_a', 'configmap_b']
+ )
+ pod_generator.extract_xcom = True
+ result = pod_generator.gen_pod()
+ result = append_to_pod(result, self.secrets)
+ result = self.resources.attach_to_pod(result)
+ result_dict = self.k8s_client.sanitize_for_serialization(result)
+ container_two = {
+ 'name': 'airflow-xcom-sidecar',
+ 'image': 'python:3.5-alpine',
+ 'command': ['python', '-c', PodDefaults.XCOM_CMD],
+ 'volumeMounts': [
+ {
+ 'name': 'xcom',
+ 'mountPath': '/airflow/xcom'
+ }
+ ]
+ }
+ self.expected['spec']['containers'].append(container_two)
+ self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
+ 'name': 'xcom',
+ 'mountPath': '/airflow/xcom'
+ })
+ self.expected['spec']['volumes'].insert(0, {
+ 'name': 'xcom', 'emptyDir': {}
+ })
+ result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+ self.assertEqual(result_dict, self.expected)
+
+ @mock.patch('uuid.uuid4')
+ def test_from_obj(self, mock_uuid):
+ mock_uuid.return_value = '0'
+ result = PodGenerator.from_obj({
+ "KubernetesExecutor": {
+ "annotations": {"test": "annotation"},
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ],
+ "securityContext": {
+ "runAsUser": 1000
+ }
+ }
+ })
+ result = self.k8s_client.sanitize_for_serialization(result)
+
+ self.assertEqual({
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'annotations': {'test': 'annotation'},
+ },
+ 'spec': {
+ 'containers': [{
+ 'args': [],
+ 'command': [],
+ 'env': [],
+ 'envFrom': [],
+ 'name': 'base',
+ 'ports': [],
+ 'volumeMounts': [{
+ 'mountPath': '/foo/',
+ 'name': 'example-kubernetes-test-volume'
+ }],
+ }],
+ 'imagePullSecrets': [],
+ 'volumes': [{
+ 'hostPath': {'path': '/tmp/'},
+ 'name': 'example-kubernetes-test-volume'
+ }],
+ }
+ }, result)
+
+ def test_reconcile_pods(self):
+ with mock.patch('uuid.uuid4') as mock_uuid:
+ mock_uuid.return_value = '0'
+ base_pod = PodGenerator(
+ image='image1',
+ name='name1',
+ envs={'key1': 'val1'},
+ cmds=['/bin/command1.sh', 'arg1'],
+ ports=k8s.V1ContainerPort(name='port', container_port=2118),
+ volumes=[{
+ 'hostPath': {'path': '/tmp/'},
+ 'name': 'example-kubernetes-test-volume1'
+ }],
+ volume_mounts=[{
+ 'mountPath': '/foo/',
+ 'name': 'example-kubernetes-test-volume1'
+ }],
+ ).gen_pod()
+
+ mutator_pod = PodGenerator(
+ envs={'key2': 'val2'},
+ image='',
+ name='name2',
+ cmds=['/bin/command2.sh', 'arg2'],
+ volumes=[{
+ 'hostPath': {'path': '/tmp/'},
+ 'name': 'example-kubernetes-test-volume2'
+ }],
+ volume_mounts=[{
+ 'mountPath': '/foo/',
+ 'name': 'example-kubernetes-test-volume2'
+ }]
+ ).gen_pod()
+
+ result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+ result = self.k8s_client.sanitize_for_serialization(result)
+ self.assertEqual(result, {
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {'name': 'name2-0'},
+ 'spec': {
+ 'containers': [{
+ 'args': [],
+ 'command': ['/bin/command1.sh', 'arg1'],
+ 'env': [
+ {'name': 'key1', 'value': 'val1'},
+ {'name': 'key2', 'value': 'val2'}
+ ],
+ 'envFrom': [],
+ 'image': 'image1',
+ 'imagePullPolicy': 'IfNotPresent',
+ 'name': 'base',
+ 'ports': {
+ 'containerPort': 2118,
+ 'name': 'port',
+ },
+ 'volumeMounts': [{
+ 'mountPath': '/foo/',
+ 'name': 'example-kubernetes-test-volume1'
+ }, {
+ 'mountPath': '/foo/',
+ 'name': 'example-kubernetes-test-volume2'
+ }]
+ }],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'restartPolicy': 'Never',
+ 'volumes': [{
+ 'hostPath': {'path': '/tmp/'},
+ 'name': 'example-kubernetes-test-volume1'
+ }, {
+ 'hostPath': {'path': '/tmp/'},
+ 'name': 'example-kubernetes-test-volume2'
+ }]
+ }
+ })
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 422d3db..9e0c288 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -30,11 +30,13 @@ class TestPodLauncher(unittest.TestCase):
self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client)
def test_read_pod_logs_successfully_returns_logs(self):
+ mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs
logs = self.pod_launcher.read_pod_logs(mock.sentinel)
self.assertEqual(mock.sentinel.logs, logs)
def test_read_pod_logs_retries_successfully(self):
+ mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
BaseHTTPError('Boom'),
mock.sentinel.logs
@@ -46,21 +48,22 @@ class TestPodLauncher(unittest.TestCase):
_preload_content=False,
container='base',
follow=True,
- name=mock.sentinel.name,
- namespace=mock.sentinel.namespace,
+ name=mock.sentinel.metadata.name,
+ namespace=mock.sentinel.metadata.namespace,
tail_lines=10
),
mock.call(
_preload_content=False,
container='base',
follow=True,
- name=mock.sentinel.name,
- namespace=mock.sentinel.namespace,
+ name=mock.sentinel.metadata.name,
+ namespace=mock.sentinel.metadata.namespace,
tail_lines=10
)
])
def test_read_pod_logs_retries_fails(self):
+ mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
BaseHTTPError('Boom'),
BaseHTTPError('Boom'),
@@ -73,11 +76,13 @@ class TestPodLauncher(unittest.TestCase):
)
def test_read_pod_returns_logs(self):
+ mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.return_value = mock.sentinel.pod_info
pod_info = self.pod_launcher.read_pod(mock.sentinel)
self.assertEqual(mock.sentinel.pod_info, pod_info)
def test_read_pod_retries_successfully(self):
+ mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [
BaseHTTPError('Boom'),
mock.sentinel.pod_info
@@ -85,11 +90,12 @@ class TestPodLauncher(unittest.TestCase):
pod_info = self.pod_launcher.read_pod(mock.sentinel)
self.assertEqual(mock.sentinel.pod_info, pod_info)
self.mock_kube_client.read_namespaced_pod.assert_has_calls([
- mock.call(mock.sentinel.name, mock.sentinel.namespace),
- mock.call(mock.sentinel.name, mock.sentinel.namespace)
+ mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace),
+ mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace)
])
def test_read_pod_retries_fails(self):
+ mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [
BaseHTTPError('Boom'),
BaseHTTPError('Boom'),
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
new file mode 100644
index 0000000..74433b4
--- /dev/null
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -0,0 +1,619 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import uuid
+from datetime import datetime
+from tests.compat import mock
+from tests.test_utils.config import conf_vars
+try:
+ from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
+ from airflow.executors.kubernetes_executor import KubeConfig
+ from airflow.kubernetes.worker_configuration import WorkerConfiguration
+ from airflow.kubernetes.pod_generator import PodGenerator
+ from airflow.exceptions import AirflowConfigException
+ from airflow.kubernetes.secret import Secret
+ import kubernetes.client.models as k8s
+ from kubernetes.client.api_client import ApiClient
+except ImportError:
+ AirflowKubernetesScheduler = None # type: ignore
+
+
+class TestKubernetesWorkerConfiguration(unittest.TestCase):
+ """
+ Tests that if dags_volume_subpath/logs_volume_subpath configuration
+ options are passed to worker pod config
+ """
+
+ affinity_config = {
+ 'podAntiAffinity': {
+ 'requiredDuringSchedulingIgnoredDuringExecution': [
+ {
+ 'topologyKey': 'kubernetes.io/hostname',
+ 'labelSelector': {
+ 'matchExpressions': [
+ {
+ 'key': 'app',
+ 'operator': 'In',
+ 'values': ['airflow']
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+
+ tolerations_config = [
+ {
+ 'key': 'dedicated',
+ 'operator': 'Equal',
+ 'value': 'airflow'
+ },
+ {
+ 'key': 'prod',
+ 'operator': 'Exists'
+ }
+ ]
+
+ def setUp(self):
+ if AirflowKubernetesScheduler is None:
+ self.skipTest("kubernetes python package is not installed")
+
+ self.kube_config = mock.MagicMock()
+ self.kube_config.airflow_home = '/'
+ self.kube_config.airflow_dags = 'dags'
+ self.kube_config.airflow_logs = 'logs'
+ self.kube_config.dags_volume_subpath = None
+ self.kube_config.logs_volume_subpath = None
+ self.kube_config.dags_in_image = False
+ self.kube_config.dags_folder = None
+ self.kube_config.git_dags_folder_mount_point = None
+ self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}
+ self.api_client = ApiClient()
+
+ def test_worker_configuration_no_subpaths(self):
+ self.kube_config.dags_volume_claim = 'airflow-dags'
+ self.kube_config.dags_folder = 'dags'
+ worker_config = WorkerConfiguration(self.kube_config)
+ volumes = worker_config._get_volumes()
+ volume_mounts = worker_config._get_volume_mounts()
+ for volume_or_mount in volumes + volume_mounts:
+ if volume_or_mount.name != 'airflow-config':
+ self.assertNotIn(
+ 'subPath', self.api_client.sanitize_for_serialization(volume_or_mount),
+ "subPath shouldn't be defined"
+ )
+
+ @conf_vars({
+ ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
+ ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
+ ('kubernetes', 'git_user'): 'some-user',
+ ('kubernetes', 'git_password'): 'some-password',
+ ('kubernetes', 'git_repo'): 'git@github.com:apache/airflow.git',
+ ('kubernetes', 'git_branch'): 'master',
+ ('kubernetes', 'git_dags_folder_mount_point'): '/usr/local/airflow/dags',
+ ('kubernetes', 'delete_worker_pods'): 'True',
+ ('kubernetes', 'kube_client_request_args'): '{"_request_timeout" : [60,360]}',
+ })
+ def test_worker_configuration_auth_both_ssh_and_user(self):
+ with self.assertRaisesRegex(AirflowConfigException,
+ 'either `git_user` and `git_password`.*'
+ 'or `git_ssh_key_secret_name`.*'
+ 'but not both$'):
+ KubeConfig()
+
+ def test_worker_with_subpaths(self):
+ self.kube_config.dags_volume_subpath = 'dags'
+ self.kube_config.logs_volume_subpath = 'logs'
+ self.kube_config.dags_volume_claim = 'dags'
+ self.kube_config.dags_folder = 'dags'
+ worker_config = WorkerConfiguration(self.kube_config)
+ volumes = worker_config._get_volumes()
+ volume_mounts = worker_config._get_volume_mounts()
+
+ for volume in volumes:
+ self.assertNotIn(
+ 'subPath', self.api_client.sanitize_for_serialization(volume),
+ "subPath isn't valid configuration for a volume"
+ )
+
+ for volume_mount in volume_mounts:
+ if volume_mount.name != 'airflow-config':
+ self.assertIn(
+ 'subPath', self.api_client.sanitize_for_serialization(volume_mount),
+ "subPath should've been passed to volumeMount configuration"
+ )
+
+ def test_worker_generate_dag_volume_mount_path(self):
+ self.kube_config.git_dags_folder_mount_point = '/root/airflow/git/dags'
+ self.kube_config.dags_folder = '/root/airflow/dags'
+ worker_config = WorkerConfiguration(self.kube_config)
+
+ self.kube_config.dags_volume_claim = 'airflow-dags'
+ self.kube_config.dags_volume_host = ''
+ dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+ self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
+
+ self.kube_config.dags_volume_claim = ''
+ self.kube_config.dags_volume_host = '/host/airflow/dags'
+ dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+ self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
+
+ self.kube_config.dags_volume_claim = ''
+ self.kube_config.dags_volume_host = ''
+ dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+ self.assertEqual(dag_volume_mount_path,
+ self.kube_config.git_dags_folder_mount_point)
+
+ def test_worker_environment_no_dags_folder(self):
+ self.kube_config.airflow_configmap = ''
+ self.kube_config.git_dags_folder_mount_point = ''
+ self.kube_config.dags_folder = ''
+ worker_config = WorkerConfiguration(self.kube_config)
+ env = worker_config._get_environment()
+
+ self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
+
+ def test_worker_environment_when_dags_folder_specified(self):
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_dags_folder_mount_point = ''
+ dags_folder = '/workers/path/to/dags'
+ self.kube_config.dags_folder = dags_folder
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ env = worker_config._get_environment()
+
+ self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
+ def test_worker_environment_dags_folder_using_git_sync(self):
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_sync_dest = 'repo'
+ self.kube_config.git_subpath = 'dags'
+ self.kube_config.git_dags_folder_mount_point = '/workers/path/to/dags'
+
+ dags_folder = '{}/{}/{}'.format(self.kube_config.git_dags_folder_mount_point,
+ self.kube_config.git_sync_dest,
+ self.kube_config.git_subpath)
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ env = worker_config._get_environment()
+
+ self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
+ def test_init_environment_using_git_sync_ssh_without_known_hosts(self):
+ # Tests the init environment created with git-sync SSH authentication option is correct
+ # without known hosts file
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_ssh_secret_name = 'airflow-secrets'
+ self.kube_config.git_ssh_known_hosts_configmap_name = None
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ init_containers = worker_config._get_init_containers()
+
+ self.assertTrue(init_containers) # check not empty
+ env = init_containers[0].env
+
+ self.assertIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='false'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+ def test_init_environment_using_git_sync_ssh_with_known_hosts(self):
+ # Tests the init environment created with git-sync SSH authentication option is correct
+ # with known hosts file
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
+ self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ init_containers = worker_config._get_init_containers()
+
+ self.assertTrue(init_containers) # check not empty
+ env = init_containers[0].env
+
+ self.assertIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='true'), env)
+ self.assertIn(k8s.V1EnvVar(
+ name='GIT_SSH_KNOWN_HOSTS_FILE',
+ value='/etc/git-secret/known_hosts'
+ ), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+ def test_init_environment_using_git_sync_user_without_known_hosts(self):
+ # Tests the init environment created with git-sync User authentication option is correct
+ # without known hosts file
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_user = 'git_user'
+ self.kube_config.git_password = 'git_password'
+ self.kube_config.git_ssh_known_hosts_configmap_name = None
+ self.kube_config.git_ssh_key_secret_name = None
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ init_containers = worker_config._get_init_containers()
+
+ self.assertTrue(init_containers) # check not empty
+ env = init_containers[0].env
+
+ self.assertNotIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_USERNAME', value='git_user'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_PASSWORD', value='git_password'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='false'), env)
+ self.assertNotIn(k8s.V1EnvVar(
+ name='GIT_SSH_KNOWN_HOSTS_FILE',
+ value='/etc/git-secret/known_hosts'
+ ), env)
+ self.assertNotIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+ def test_init_environment_using_git_sync_user_with_known_hosts(self):
+ # Tests the init environment created with git-sync User authentication option is correct
+ # with known hosts file
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_user = 'git_user'
+ self.kube_config.git_password = 'git_password'
+ self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
+ self.kube_config.git_ssh_key_secret_name = None
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ init_containers = worker_config._get_init_containers()
+
+ self.assertTrue(init_containers) # check not empty
+ env = init_containers[0].env
+
+ self.assertNotIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_USERNAME', value='git_user'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_PASSWORD', value='git_password'), env)
+ self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='true'), env)
+ self.assertIn(k8s.V1EnvVar(
+ name='GIT_SSH_KNOWN_HOSTS_FILE',
+ value='/etc/git-secret/known_hosts'
+ ), env)
+ self.assertNotIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+ def test_init_environment_using_git_sync_run_as_user_empty(self):
+ # Tests if git_syn_run_as_user is none, then no securityContext created in init container
+
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+ self.kube_config.git_sync_run_as_user = ''
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ init_containers = worker_config._get_init_containers()
+ self.assertTrue(init_containers) # check not empty
+
+ self.assertIsNone(init_containers[0].security_context)
+
+ def test_make_pod_run_as_user_0(self):
+ # Tests the pod created with run-as-user 0 actually gets that in it's config
+ self.kube_config.worker_run_as_user = 0
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+ self.kube_config.worker_fs_group = None
+ self.kube_config.git_dags_folder_mount_point = 'dags'
+ self.kube_config.git_sync_dest = 'repo'
+ self.kube_config.git_subpath = 'path'
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+ "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+ self.assertEqual(0, pod.spec.security_context.run_as_user)
+
+ def test_make_pod_git_sync_ssh_without_known_hosts(self):
+ # Tests the pod created with git-sync SSH authentication option is correct without known hosts
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+ self.kube_config.worker_fs_group = None
+ self.kube_config.git_dags_folder_mount_point = 'dags'
+ self.kube_config.git_sync_dest = 'repo'
+ self.kube_config.git_subpath = 'path'
+
+ worker_config = WorkerConfiguration(self.kube_config)
+
+ pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+ "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+ init_containers = worker_config._get_init_containers()
+ git_ssh_key_file = next((x.value for x in init_containers[0].env
+ if x.name == 'GIT_SSH_KEY_FILE'), None)
+ volume_mount_ssh_key = next((x.mount_path for x in init_containers[0].volume_mounts
+ if x.name == worker_config.git_sync_ssh_secret_volume_name),
+ None)
+ self.assertTrue(git_ssh_key_file)
+ self.assertTrue(volume_mount_ssh_key)
+ self.assertEqual(65533, pod.spec.security_context.fs_group)
+ self.assertEqual(git_ssh_key_file,
+ volume_mount_ssh_key,
+ 'The location where the git ssh secret is mounted'
+ ' needs to be the same as the GIT_SSH_KEY_FILE path')
+
+ def test_make_pod_git_sync_credentials_secret(self):
+ # Tests the pod created with git_sync_credentials_secret will get into the init container
+ self.kube_config.git_sync_credentials_secret = 'airflow-git-creds-secret'
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+ self.kube_config.worker_fs_group = None
+ self.kube_config.git_dags_folder_mount_point = 'dags'
+ self.kube_config.git_sync_dest = 'repo'
+ self.kube_config.git_subpath = 'path'
+
+ worker_config = WorkerConfiguration(self.kube_config)
+
+ pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+ "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+ username_env = k8s.V1EnvVar(
+ name='GIT_SYNC_USERNAME',
+ value_from=k8s.V1EnvVarSource(
+ secret_key_ref=k8s.V1SecretKeySelector(
+ name=self.kube_config.git_sync_credentials_secret,
+ key='GIT_SYNC_USERNAME')
+ )
+ )
+ password_env = k8s.V1EnvVar(
+ name='GIT_SYNC_PASSWORD',
+ value_from=k8s.V1EnvVarSource(
+ secret_key_ref=k8s.V1SecretKeySelector(
+ name=self.kube_config.git_sync_credentials_secret,
+ key='GIT_SYNC_PASSWORD')
+ )
+ )
+
+ self.assertIn(username_env, pod.spec.init_containers[0].env,
+ 'The username env for git credentials did not get into the init container')
+
+ self.assertIn(password_env, pod.spec.init_containers[0].env,
+ 'The password env for git credentials did not get into the init container')
+
+ def test_make_pod_git_sync_ssh_with_known_hosts(self):
+ # Tests the pod created with git-sync SSH authentication option is correct with known hosts
+ self.kube_config.airflow_configmap = 'airflow-configmap'
+ self.kube_config.git_ssh_secret_name = 'airflow-secrets'
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+
+ worker_config = WorkerConfiguration(self.kube_config)
+
+ init_containers = worker_config._get_init_containers()
+ git_ssh_known_hosts_file = next((x.value for x in init_containers[0].env
+ if x.name == 'GIT_SSH_KNOWN_HOSTS_FILE'), None)
+
+ volume_mount_ssh_known_hosts_file = next(
+ (x.mount_path for x in init_containers[0].volume_mounts
+ if x.name == worker_config.git_sync_ssh_known_hosts_volume_name),
+ None)
+ self.assertTrue(git_ssh_known_hosts_file)
+ self.assertTrue(volume_mount_ssh_known_hosts_file)
+ self.assertEqual(git_ssh_known_hosts_file,
+ volume_mount_ssh_known_hosts_file,
+ 'The location where the git known hosts file is mounted'
+ ' needs to be the same as the GIT_SSH_KNOWN_HOSTS_FILE path')
+
+ def test_make_pod_with_empty_executor_config(self):
+ self.kube_config.kube_affinity = self.affinity_config
+ self.kube_config.kube_tolerations = self.tolerations_config
+ self.kube_config.dags_folder = 'dags'
+ worker_config = WorkerConfiguration(self.kube_config)
+
+ pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+ "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+ self.assertTrue(pod.spec.affinity['podAntiAffinity'] is not None)
+ self.assertEqual('app',
+ pod.spec.affinity['podAntiAffinity']
+ ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+ ['labelSelector']
+ ['matchExpressions'][0]
+ ['key'])
+
+ self.assertEqual(2, len(pod.spec.tolerations))
+ self.assertEqual('prod', pod.spec.tolerations[1]['key'])
+
+ def test_make_pod_with_executor_config(self):
+ self.kube_config.dags_folder = 'dags'
+ worker_config = WorkerConfiguration(self.kube_config)
+ config_pod = PodGenerator(
+ image='',
+ affinity=self.affinity_config,
+ tolerations=self.tolerations_config,
+ ).gen_pod()
+
+ pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+ "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+ result = PodGenerator.reconcile_pods(pod, config_pod)
+
+ self.assertTrue(result.spec.affinity['podAntiAffinity'] is not None)
+ self.assertEqual('app',
+ result.spec.affinity['podAntiAffinity']
+ ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+ ['labelSelector']
+ ['matchExpressions'][0]
+ ['key'])
+
+ self.assertEqual(2, len(result.spec.tolerations))
+ self.assertEqual('prod', result.spec.tolerations[1]['key'])
+
+ def test_worker_pvc_dags(self):
+ # Tests persistence volume config created when `dags_volume_claim` is set
+ self.kube_config.dags_volume_claim = 'airflow-dags'
+ self.kube_config.dags_folder = 'dags'
+ worker_config = WorkerConfiguration(self.kube_config)
+ volumes = worker_config._get_volumes()
+ volume_mounts = worker_config._get_volume_mounts()
+
+ init_containers = worker_config._get_init_containers()
+
+ dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+ dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+ self.assertEqual('airflow-dags', dag_volume[0].persistent_volume_claim.claim_name)
+ self.assertEqual(1, len(dag_volume_mount))
+ self.assertTrue(dag_volume_mount[0].read_only)
+ self.assertEqual(0, len(init_containers))
+
+ def test_worker_git_dags(self):
+ # Tests persistence volume config created when `git_repo` is set
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_folder = '/usr/local/airflow/dags'
+ self.kube_config.worker_dags_folder = '/usr/local/airflow/dags'
+
+ self.kube_config.git_sync_container_repository = 'gcr.io/google-containers/git-sync-amd64'
+ self.kube_config.git_sync_container_tag = 'v2.0.5'
+ self.kube_config.git_sync_container = 'gcr.io/google-containers/git-sync-amd64:v2.0.5'
+ self.kube_config.git_sync_init_container_name = 'git-sync-clone'
+ self.kube_config.git_subpath = 'dags_folder'
+ self.kube_config.git_sync_root = '/git'
+ self.kube_config.git_sync_run_as_user = 65533
+ self.kube_config.git_dags_folder_mount_point = '/usr/local/airflow/dags/repo/dags_folder'
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ volumes = worker_config._get_volumes()
+ volume_mounts = worker_config._get_volume_mounts()
+
+ dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+ dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+ self.assertIsNotNone(dag_volume[0].empty_dir)
+ self.assertEqual(self.kube_config.git_dags_folder_mount_point, dag_volume_mount[0].mount_path)
+ self.assertTrue(dag_volume_mount[0].read_only)
+
+ init_container = worker_config._get_init_containers()[0]
+ init_container_volume_mount = [mount for mount in init_container.volume_mounts
+ if mount.name == 'airflow-dags']
+
+ self.assertEqual('git-sync-clone', init_container.name)
+ self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', init_container.image)
+ self.assertEqual(1, len(init_container_volume_mount))
+ self.assertFalse(init_container_volume_mount[0].read_only)
+ self.assertEqual(65533, init_container.security_context.run_as_user)
+
+ def test_worker_container_dags(self):
+ # Tests that the 'airflow-dags' persistence volume is NOT created when `dags_in_image` is set
+ self.kube_config.dags_in_image = True
+ self.kube_config.dags_folder = 'dags'
+ worker_config = WorkerConfiguration(self.kube_config)
+ volumes = worker_config._get_volumes()
+ volume_mounts = worker_config._get_volume_mounts()
+
+ dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+ dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+ init_containers = worker_config._get_init_containers()
+
+ self.assertEqual(0, len(dag_volume))
+ self.assertEqual(0, len(dag_volume_mount))
+ self.assertEqual(0, len(init_containers))
+
+ def test_kubernetes_environment_variables(self):
+ # Tests the kubernetes environment variables get copied into the worker pods
+ input_environment = {
+ 'ENVIRONMENT': 'prod',
+ 'LOG_LEVEL': 'warning'
+ }
+ self.kube_config.kube_env_vars = input_environment
+ worker_config = WorkerConfiguration(self.kube_config)
+ env = worker_config._get_environment()
+ for key in input_environment:
+ self.assertIn(key, env)
+ self.assertIn(input_environment[key], env.values())
+
+ core_executor = 'AIRFLOW__CORE__EXECUTOR'
+ input_environment = {
+ core_executor: 'NotLocalExecutor'
+ }
+ self.kube_config.kube_env_vars = input_environment
+ worker_config = WorkerConfiguration(self.kube_config)
+ env = worker_config._get_environment()
+ self.assertEqual(env[core_executor], 'LocalExecutor')
+
+ def test_get_secrets(self):
+ # Test when secretRef is None and kube_secrets is not empty
+ self.kube_config.kube_secrets = {
+ 'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key',
+ 'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials'
+ }
+ self.kube_config.env_from_secret_ref = None
+ worker_config = WorkerConfiguration(self.kube_config)
+ secrets = worker_config._get_secrets()
+ secrets.sort(key=lambda secret: secret.deploy_target)
+ expected = [
+ Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'),
+ Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials')
+ ]
+ self.assertListEqual(expected, secrets)
+
+ # Test when secret is not empty and kube_secrets is empty dict
+ self.kube_config.kube_secrets = {}
+ self.kube_config.env_from_secret_ref = 'secret_a,secret_b'
+ worker_config = WorkerConfiguration(self.kube_config)
+ secrets = worker_config._get_secrets()
+ expected = [
+ Secret('env', None, 'secret_a'),
+ Secret('env', None, 'secret_b')
+ ]
+ self.assertListEqual(expected, secrets)
+
+ def test_get_env_from(self):
+ # Test when configmap is empty
+ self.kube_config.env_from_configmap_ref = ''
+ worker_config = WorkerConfiguration(self.kube_config)
+ configmaps = worker_config._get_env_from()
+ self.assertListEqual([], configmaps)
+
+ # test when configmap is not empty
+ self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b'
+ self.kube_config.env_from_secret_ref = 'secretref_a,secretref_b'
+ worker_config = WorkerConfiguration(self.kube_config)
+ configmaps = worker_config._get_env_from()
+ self.assertListEqual([
+ k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_a')),
+ k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_b')),
+ k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name='secretref_a')),
+ k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name='secretref_b'))
+ ], configmaps)
+
+ def test_get_labels(self):
+ worker_config = WorkerConfiguration(self.kube_config)
+ labels = worker_config._get_labels({'my_kube_executor_label': 'kubernetes'}, {
+ 'dag_id': 'override_dag_id',
+ })
+ self.assertEqual({
+ 'my_label': 'label_id',
+ 'dag_id': 'override_dag_id',
+ 'my_kube_executor_label': 'kubernetes'
+ }, labels)
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
index 96e06fe..cc74f3f 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
@@ -19,8 +19,8 @@ import json
import os
import shutil
import unittest
-from tests.compat import mock
+import kubernetes.client.models as k8s
import pytest
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
@@ -28,11 +28,12 @@ from kubernetes.client.rest import ApiException
from airflow import AirflowException
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_generator import PodDefaults
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.secret import Secret
-from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.version import version as airflow_version
+from tests.compat import mock
@pytest.mark.runtime("kubernetes")
@@ -77,24 +78,6 @@ class TestKubernetesPodOperator(unittest.TestCase):
}
}
- def test_do_xcom_push_defaults_false(self):
- new_config_path = '/tmp/kube_config'
- old_config_path = os.path.expanduser('~/.kube/config')
- shutil.copy(old_config_path, new_config_path)
-
- k = KubernetesPodOperator(
- namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo 10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- in_cluster=False,
- do_xcom_push=False,
- config_file=new_config_path,
- )
- self.assertFalse(k.do_xcom_push)
def test_config_path_move(self):
new_config_path = '/tmp/kube_config'
@@ -114,6 +97,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
config_file=new_config_path,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.assertEqual(self.expected_pod, actual_pod)
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@mock.patch("airflow.kubernetes.kube_client.get_kube_client")
@@ -163,8 +148,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
mock_launcher.return_value = (State.SUCCESS, None)
k.execute(None)
- self.assertEqual(mock_launcher.call_args[0][0].image_pull_secrets,
- fake_pull_secrets)
+ self.assertEqual(
+ mock_launcher.call_args[0][0].spec.image_pull_secrets,
+ [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
+ )
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
@@ -201,6 +188,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
do_xcom_push=False,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.assertEqual(self.expected_pod, actual_pod)
def test_delete_operator_pod(self):
k = KubernetesPodOperator(
@@ -216,6 +205,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
do_xcom_push=False,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.assertEqual(self.expected_pod, actual_pod)
def test_pod_hostnetwork(self):
k = KubernetesPodOperator(
@@ -231,6 +222,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
hostnetwork=True,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['hostNetwork'] = True
+ self.assertEqual(self.expected_pod, actual_pod)
def test_pod_dnspolicy(self):
dns_policy = "ClusterFirstWithHostNet"
@@ -248,6 +242,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
dnspolicy=dns_policy,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['hostNetwork'] = True
+ self.expected_pod['spec']['dnsPolicy'] = dns_policy
+ self.assertEqual(self.expected_pod, actual_pod)
def test_pod_node_selectors(self):
node_selectors = {
@@ -266,6 +264,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
node_selectors=node_selectors,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['nodeSelector'] = node_selectors
+ self.assertEqual(self.expected_pod, actual_pod)
def test_pod_resources(self):
resources = {
@@ -287,6 +288,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
resources=resources,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['containers'][0]['resources'] = resources
+ self.assertEqual(self.expected_pod, actual_pod)
def test_pod_affinity(self):
affinity = {
@@ -319,6 +323,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
affinity=affinity,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['affinity'] = affinity
+ self.assertEqual(self.expected_pod, actual_pod)
def test_port(self):
port = Port('http', 80)
@@ -336,6 +343,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
ports=[port],
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['containers'][0]['ports'] = [{
+ 'name': 'http',
+ 'containerPort': 80
+ }]
+ self.assertEqual(self.expected_pod, actual_pod)
def test_volume_mount(self):
with mock.patch.object(PodLauncher, 'log') as mock_logger:
@@ -368,6 +381,20 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
k.execute(None)
mock_logger.info.assert_any_call(b"retrieved from mount\n")
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['containers'][0]['args'] = args
+ self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
+ 'name': 'test-volume',
+ 'mountPath': '/root/mount_file',
+ 'readOnly': True
+ }]
+ self.expected_pod['spec']['volumes'] = [{
+ 'name': 'test-volume',
+ 'persistentVolumeClaim': {
+ 'claimName': 'test-volume'
+ }
+ }]
+ self.assertEqual(self.expected_pod, actual_pod)
def test_run_as_user_root(self):
security_context = {
@@ -388,6 +415,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
security_context=security_context,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['securityContext'] = security_context
+ self.assertEqual(self.expected_pod, actual_pod)
def test_run_as_user_non_root(self):
security_context = {
@@ -409,6 +439,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
security_context=security_context,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['securityContext'] = security_context
+ self.assertEqual(self.expected_pod, actual_pod)
def test_fs_group(self):
security_context = {
@@ -430,6 +463,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
security_context=security_context,
)
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['securityContext'] = security_context
+ self.assertEqual(self.expected_pod, actual_pod)
def test_faulty_image(self):
bad_image_name = "foobar"
@@ -447,6 +483,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
with self.assertRaises(AirflowException):
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['containers'][0]['image'] = bad_image_name
+ self.assertEqual(self.expected_pod, actual_pod)
def test_faulty_service_account(self):
bad_service_account_name = "foobar"
@@ -465,6 +504,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
with self.assertRaises(ApiException):
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['serviceAccountName'] = bad_service_account_name
+ self.assertEqual(self.expected_pod, actual_pod)
def test_pod_failure(self):
"""
@@ -484,6 +526,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
with self.assertRaises(AirflowException):
k.execute(None)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['containers'][0]['args'] = bad_internal_command
+ self.assertEqual(self.expected_pod, actual_pod)
def test_xcom_push(self):
return_value = '{"foo": "bar"\n, "buzz": 2}'
@@ -500,13 +545,23 @@ class TestKubernetesPodOperator(unittest.TestCase):
do_xcom_push=True,
)
self.assertEqual(k.execute(None), json.loads(return_value))
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ volume = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME)
+ volume_mount = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME_MOUNT)
+ container = self.api_client.sanitize_for_serialization(PodDefaults.SIDECAR_CONTAINER)
+ self.expected_pod['spec']['containers'][0]['args'] = args
+ self.expected_pod['spec']['containers'][0]['volumeMounts'].insert(0, volume_mount)
+ self.expected_pod['spec']['volumes'].insert(0, volume)
+ self.expected_pod['spec']['containers'].append(container)
+ self.assertEqual(self.expected_pod, actual_pod)
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_envs_from_configmaps(self, mock_client, mock_launcher):
# GIVEN
from airflow.utils.state import State
- configmaps = ['test-configmap']
+
+ configmap= 'test-configmap'
# WHEN
k = KubernetesPodOperator(
namespace='default',
@@ -518,12 +573,17 @@ class TestKubernetesPodOperator(unittest.TestCase):
task_id="task",
in_cluster=False,
do_xcom_push=False,
- configmaps=configmaps
+ configmaps=[configmap]
)
# THEN
mock_launcher.return_value = (State.SUCCESS, None)
k.execute(None)
- self.assertEqual(mock_launcher.call_args[0][0].configmaps, configmaps)
+ self.assertEqual(
+ mock_launcher.call_args[0][0].spec.containers[0].env_from,
+ [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
+ name=configmap
+ ))]
+ )
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@mock.patch("airflow.kubernetes.kube_client.get_kube_client")
@@ -548,7 +608,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
# THEN
mock_launcher.return_value = (State.SUCCESS, None)
k.execute(None)
- self.assertEqual(mock_launcher.call_args[0][0].secrets, secrets)
+ self.assertEqual(
+ mock_launcher.call_args[0][0].spec.containers[0].env_from,
+ [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
+ name=secret_ref
+ ))]
+ )
# pylint: enable=unused-argument
diff --git a/tests/test_local_settings.py b/tests/test_local_settings.py
index fdc6cb9..3497ee2 100644
--- a/tests/test_local_settings.py
+++ b/tests/test_local_settings.py
@@ -23,8 +23,6 @@ import tempfile
import unittest
from tests.compat import MagicMock, Mock, call, patch
-from airflow.kubernetes.pod import Pod
-
SETTINGS_FILE_POLICY = """
def test_policy(task_instance):
@@ -149,7 +147,7 @@ class LocalSettingsTest(unittest.TestCase):
from airflow import settings
settings.import_local_settings() # pylint: ignore
- pod = Pod(image="ubuntu", envs={}, cmds=['echo "1"'])
+ pod = MagicMock()
settings.pod_mutation_hook(pod)
assert pod.namespace == 'airflow-tests'