You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/12 18:03:03 UTC
[03/16] incubator-airflow git commit: [AIRFLOW-1517] Restore
authorship of secrets and init container
[AIRFLOW-1517] Restore authorship of secrets and init container
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/361dad95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/361dad95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/361dad95
Branch: refs/heads/master
Commit: 361dad957cca17eb37964c5945446bf62d74592f
Parents: c5ced07
Author: Benjamin Goldberg <be...@spothero.com>
Authored: Wed Dec 27 08:30:04 2017 -0600
Committer: Benjamin Goldberg <be...@spothero.com>
Committed: Wed Dec 27 08:44:39 2017 -0600
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 48 +++++++++
.../pod_request_factory.py | 4 +
airflow/contrib/kubernetes/pod.py | 3 +
airflow/contrib/kubernetes/pod_generator.py | 105 +++++++++++++++++++
airflow/contrib/kubernetes/secret.py | 36 +++++++
5 files changed, 196 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 88d3f32..9398bef 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -89,6 +89,37 @@ class KubernetesRequestFactory:
req['metadata']['name'] = pod.name
@staticmethod
+ def extract_volume_secrets(pod, req):
+ vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
+ if any(vol_secrets):
+ req['spec']['containers'][0]['volumeMounts'] = []
+ req['spec']['volumes'] = []
+ for idx, vol in enumerate(vol_secrets):
+ vol_id = 'secretvol' + str(idx)
+ req['spec']['containers'][0]['volumeMounts'].append({
+ 'mountPath': vol.deploy_target,
+ 'name': vol_id,
+ 'readOnly': True
+ })
+ req['spec']['volumes'].append({
+ 'name': vol_id,
+ 'secret': {
+ 'secretName': vol.secret
+ }
+ })
+
+ @staticmethod
+ def extract_env_and_secrets(pod, req):
+ env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
+ if len(pod.envs) > 0 or len(env_secrets) > 0:
+ env = []
+ for k in pod.envs.keys():
+ env.append({'name': k, 'value': pod.envs[k]})
+ for secret in env_secrets:
+ KubernetesRequestFactory.add_secret_to_env(env, secret)
+ req['spec']['containers'][0]['env'] = env
+
+ @staticmethod
def extract_resources(pod, req):
if not pod.resources or pod.resources.is_empty_resource_request():
return
@@ -112,3 +143,20 @@ class KubernetesRequestFactory:
if pod.resources.request_cpu:
req['spec']['containers'][0]['resources']['limits'][
'cpu'] = pod.resources.limit_cpu
+
+ @staticmethod
+ def extract_init_containers(pod, req):
+ if pod.init_containers:
+ req['spec']['initContainers'] = pod.init_containers
+
+ @staticmethod
+ def extract_service_account_name(pod, req):
+ if pod.service_account_name:
+ req['spec']['serviceAccountName'] = pod.service_account_name
+
+ @staticmethod
+ def extract_image_pull_secrets(pod, req):
+ if pod.image_pull_secrets:
+ req['spec']['imagePullSecrets'] = [{
+ 'name': pull_secret
+ } for pull_secret in pod.image_pull_secrets.split(',')]
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index ac9ded1..3be1a13 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -45,8 +45,12 @@ spec:
self.extract_cmds(pod, req)
self.extract_args(pod, req)
self.extract_node_selector(pod, req)
+ self.extract_env_and_secrets(pod, req)
self.extract_volume_secrets(pod, req)
self.attach_volumes(pod, req)
self.attach_volume_mounts(pod, req)
self.extract_resources(pod, req)
+ self.extract_service_account_name(pod, req)
+ self.extract_init_containers(pod, req)
+ self.extract_image_pull_secrets(pod, req)
return req
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index f6a0583..6a9f76d 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -86,4 +86,7 @@ class Pod:
self.node_selectors = node_selectors or []
self.namespace = namespace
self.image_pull_policy = image_pull_policy
+ self.image_pull_secrets = image_pull_secrets
+ self.init_containers = init_containers
+ self.service_account_name = service_account_name
self.resources = resources or Resources()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 69c6fcb..685be37 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -29,6 +29,39 @@ class PodGenerator:
self.init_containers = []
self.secrets = []
+ def add_init_container(self,
+ name,
+ image,
+ securityContext,
+ init_environment,
+ volume_mounts
+ ):
+ """
+
+ Adds an init container to the launched pod. useful for pre-
+
+ Args:
+ name (str):
+ image (str):
+ securityContext (dict):
+ init_environment (dict):
+ volume_mounts (dict):
+
+ Returns:
+
+ """
+ self.init_containers.append(
+ {
+ 'name': name,
+ 'image': image,
+ 'securityContext': securityContext,
+ 'env': init_environment,
+ 'volumeMounts': volume_mounts
+ }
+ )
+
+ def _get_init_containers(self):
+ return self.init_containers
def add_volume(self, name):
"""
@@ -76,6 +109,12 @@ class PodGenerator:
def _get_volumes_and_mounts(self):
return self.volumes, self.volume_mounts
+ def _get_image_pull_secrets(self):
+ """Extracts any image pull secrets for fetching container(s)"""
+ if not self.kube_config.image_pull_secrets:
+ return []
+ return self.kube_config.image_pull_secrets.split(',')
+
def make_pod(self, namespace, image, pod_id, cmds,
arguments, labels, kube_executor_config=None):
volumes, volume_mounts = self._get_volumes_and_mounts()
@@ -97,6 +136,9 @@ class PodGenerator:
labels=labels,
envs=self.env_vars,
secrets={},
+ # service_account_name=self.kube_config.worker_service_account_name,
+ # image_pull_secrets=self.kube_config.image_pull_secrets,
+ init_containers=worker_init_container_spec,
volumes=volumes,
volume_mounts=volume_mounts,
resources=None
@@ -128,12 +170,32 @@ class WorkerGenerator(PodGenerator):
'readOnly': True
}]
+ # Mount the airflow.cfg file via a configmap the user has specified
+ if self.kube_config.airflow_configmap:
+ config_volume_name = "airflow-config"
+ config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+ volumes.append({
+ 'name': config_volume_name,
+ 'configMap': {
+ 'name': self.kube_config.airflow_configmap
+ }
+ })
+ volume_mounts.append({
+ 'name': config_volume_name,
+ 'mountPath': config_path,
+ 'subPath': 'airflow.cfg',
+ 'readOnly': True
+ })
+
# A PV with the DAGs should be mounted
if self.kube_config.dags_volume_claim:
volumes[0]['persistentVolumeClaim'] = {
"claimName": self.kube_config.dags_volume_claim}
if self.kube_config.dags_volume_subpath:
volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
+ else:
+ # Create a Shared Volume for the Git-Sync module to populate
+ volumes[0]["emptyDir"] = {}
return volumes, volume_mounts
def _init_labels(self, dag_id, task_id, execution_date):
@@ -154,6 +216,49 @@ class WorkerGenerator(PodGenerator):
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
return env
+ def _init_init_containers(self, volume_mounts):
+ """When using git to retrieve the DAGs, use the GitSync Init Container"""
+ # If we're using volume claims to mount the dags, no init container is needed
+ if self.kube_config.dags_volume_claim:
+ return []
+
+ # Otherwise, define a git-sync init container
+ init_environment = [{
+ 'name': 'GIT_SYNC_REPO',
+ 'value': self.kube_config.git_repo
+ }, {
+ 'name': 'GIT_SYNC_BRANCH',
+ 'value': self.kube_config.git_branch
+ }, {
+ 'name': 'GIT_SYNC_ROOT',
+ 'value': '/tmp'
+ }, {
+ 'name': 'GIT_SYNC_DEST',
+ 'value': 'dags'
+ }, {
+ 'name': 'GIT_SYNC_ONE_TIME',
+ 'value': 'true'
+ }]
+ if self.kube_config.git_user:
+ init_environment.append({
+ 'name': 'GIT_SYNC_USERNAME',
+ 'value': self.kube_config.git_user
+ })
+ if self.kube_config.git_password:
+ init_environment.append({
+ 'name': 'GIT_SYNC_PASSWORD',
+ 'value': self.kube_config.git_password
+ })
+
+ volume_mounts[0]['readOnly'] = False
+ return [{
+ 'name': self.kube_config.git_sync_init_container_name,
+ 'image': self.kube_config.git_sync_container,
+ 'securityContext': {'runAsUser': 0},
+ 'env': init_environment,
+ 'volumeMounts': volume_mounts
+ }]
+
def make_worker_pod(self,
namespace,
pod_id,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index e69de29..15f070e 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Secret:
+ """Defines Kubernetes Secret Volume"""
+
+ def __init__(self, deploy_type, deploy_target, secret, key):
+ """Initialize a Kubernetes Secret Object. Used to track requested secrets from
+ the user.
+
+ :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
+ `volume`
+ :type deploy_type: ``str``
+ :param deploy_target: The environment variable to be created in the worker.
+ :type deploy_target: ``str``
+ :param secret: Name of the secrets object in Kubernetes
+ :type secret: ``str``
+ :param key: Key of the secret within the Kubernetes Secret
+ :type key: ``str``
+ """
+ self.deploy_type = deploy_type
+ self.deploy_target = deploy_target.upper()
+ self.secret = secret
+ self.key = key